In [96]:
import pandas as pd
import numpy as np
import xgboost as xgb
import dalex as dx

from sklearn.pipeline import make_pipeline
from sklearn.compose import make_column_transformer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split

from sklearn.impute import SimpleImputer

import xgboost as xgb
import numpy as np
from typing import Tuple
from sklearn.metrics import balanced_accuracy_score, accuracy_score

import sympy as sp

In [2]:
titanic = dx.datasets.load_titanic()
X = titanic.drop(columns='survived')
y = titanic.survived

In [3]:
X_transformed = pd.get_dummies(X, drop_first=True)
display(X.head())
display(X_transformed.head())

Unnamed: 0,gender,age,class,embarked,fare,sibsp,parch
0,male,42.0,3rd,Southampton,7.11,0,0
1,male,13.0,3rd,Southampton,20.05,0,2
2,male,16.0,3rd,Southampton,20.05,1,1
3,female,39.0,3rd,Southampton,20.05,1,1
4,female,16.0,3rd,Southampton,7.13,0,0


Unnamed: 0,age,fare,sibsp,parch,gender_male,class_2nd,class_3rd,class_deck crew,class_engineering crew,class_restaurant staff,class_victualling crew,embarked_Cherbourg,embarked_Queenstown,embarked_Southampton
0,42.0,7.11,0,0,True,False,True,False,False,False,False,False,False,True
1,13.0,20.05,0,2,True,False,True,False,False,False,False,False,False,True
2,16.0,20.05,1,1,True,False,True,False,False,False,False,False,False,True
3,39.0,20.05,1,1,False,False,True,False,False,False,False,False,False,True
4,16.0,7.13,0,0,False,False,True,False,False,False,False,False,False,True


In [4]:
X_train, X_test, y_train, y_test = train_test_split(X_transformed, y, test_size=0.2, random_state=42)

In [5]:
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

In [68]:
y, y_hut = sp.symbols('y y_hut')
lambda_reg = sp.symbols('lambda_reg')
f = sp.Function('f')(y, y_hut, lambda_reg)

f = (y_hut-y)**2 + lambda_reg*(y_hut**2)

grad = sp.lambdify((y, y_hut, lambda_reg), f.diff(y_hut), 'numpy')
hess = sp.lambdify((y, y_hut, lambda_reg), f.diff(y_hut, y_hut), 'numpy')

y_pred = dtrain.get_label()
y_rel = dtrain.get_label()

lambda_r = 0.1
grad, hess = grad(y=y_rel, y_hut=y_pred, lambda_reg=lambda_r), hess(y=y_rel, y_hut=y_pred, lambda_reg=lambda_r)

if type(grad) == float:
    grad = np.ones(y_pred.shape[0]) * grad

if type(hess) == float:
    hess = np.ones(y_pred.shape[0]) * hess


((1765,), (1765,))

In [110]:
def XGBoost_changed(lambda_r: float, dtrain: xgb.DMatrix) -> xgb.Booster:
    def objective_function(y_pred: np.ndarray, dtrain_: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
        y, y_hut = sp.symbols('y y_hut')
        lambda_reg = sp.symbols('lambda_reg')
        f = sp.Function('f')(y, y_hut, lambda_reg)

        f = (y_hut-y)**4 + lambda_reg*(y_hut**2)

        grad = sp.lambdify((y, y_hut, lambda_reg), f.diff(y_hut), 'numpy')
        hess = sp.lambdify((y, y_hut, lambda_reg), f.diff(y_hut, y_hut), 'numpy')

        y_rel = dtrain_.get_label()

        grad, hess = grad(y=y_rel, y_hut=y_pred, lambda_reg=lambda_r), hess(y=y_rel, y_hut=y_pred, lambda_reg=lambda_r)

        if type(grad) in [float, int]:
            grad = np.ones(y_pred.shape[0]) * grad
        if type(hess) in [float, int]:
            hess = np.ones(y_pred.shape[0]) * hess
        
        return grad, hess
    
    params = {
        # 'objective': 'reg:squarederror',
        'tree_method': 'hist',
        'seed': 2001,
    }

    model = xgb.train(
        params=params,
        dtrain=dtrain,
        num_boost_round=100,
        obj=objective_function
    )
    return model

In [111]:
for lambda_r in np.logspace(-3, 1, 14):
    model = XGBoost_changed(float(lambda_r), dtrain)
    set = dtest
    pred = model.predict(set)
    print("Dla lambda", lambda_r, "to: ", balanced_accuracy_score(set.get_label(), pred > 0.5))

Dla lambda 0.001 to:  0.7530272692601068
Dla lambda 0.002030917620904735 to:  0.7345776125095347
Dla lambda 0.004124626382901352 to:  0.723064454614798
Dla lambda 0.008376776400682925 to:  0.7349113272311212
Dla lambda 0.017012542798525893 to:  0.7164616704805492
Dla lambda 0.0345510729459222 to:  0.7315980167810832
Dla lambda 0.07017038286703829 to:  0.7227069031273836
Dla lambda 0.14251026703029993 to:  0.6851401601830663
Dla lambda 0.28942661247167517 to:  0.6426392067124332
Dla lambda 0.5878016072274912 to:  0.5615942028985508
Dla lambda 1.1937766417144369 to:  0.5072463768115942
Dla lambda 2.424462017082331 to:  0.5
Dla lambda 4.923882631706742 to:  0.5
Dla lambda 10.0 to:  0.5


In [None]:
df = pd.DataFrame({'lambda_reg': np.arange(0, 2, 0.025)})
df['test'] = df['lambda_reg'].apply(lambda x: score(x, dtest))
df['train'] = df['lambda_reg'].apply(lambda x: score(x, dtrain))

In [None]:
df.plot(x='lambda_reg', y=['test', 'train'], title='Regularization impact on accuracy')

In [None]:
balanced_accuracy_score(y_train, preds > 0.5)

In [None]:
from xgboost import XGBClassifier

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import balanced_accuracy_score

data = load_iris()
X_train, X_test, y_train, y_test = train_test_split(data['data'], data['target'], test_size=.2)

bst = XGBClassifier(n_estimators=2, max_depth=2, learning_rate=1, objective=squared_log)
                    # 'binary:logistic')

bst.fit(X_train, y_train)

preds = bst.predict(X_test)

balanced_accuracy_score(y_test, preds)

In [None]:
from sklearn.datasets import make_classification
num_classes = 3
X, y = make_classification(n_samples=1000, n_informative=5,
                           n_classes=num_classes)
dtrain = xgb.DMatrix(data=X, label=y)
num_parallel_tree = 4
num_boost_round = 16
# total number of built trees is num_parallel_tree * num_classes * num_boost_round

# We build a boosted random forest for classification here.
booster = xgb.train({
    'num_parallel_tree': 4, 'subsample': 0.5, 'num_class': 3},
                    num_boost_round=num_boost_round, dtrain=dtrain)

# This is the sliced model, containing [3, 7) forests
# step is also supported with some limitations like negative step is invalid.
sliced: xgb.Booster = booster[3:7]

# Access individual tree layer
trees = [_ for _ in booster]
assert len(trees) == num_boost_round

print(trees)

In [310]:
import numpy as np
import xgboost as xgb
from typing import Tuple

def gradient(predt: np.ndarray, dtrain: xgb.DMatrix) -> np.ndarray:
    '''Compute the gradient squared log error.'''
    y = dtrain.get_label()
    return (np.log1p(predt) - np.log1p(y)) / (predt + 1)

def hessian(predt: np.ndarray, dtrain: xgb.DMatrix) -> np.ndarray:
    '''Compute the hessian for squared log error.'''
    y = dtrain.get_label()
    return ((-np.log1p(predt) + np.log1p(y) + 1) /
            np.power(predt + 1, 2))

def squared_log(predt: np.ndarray,
                dtrain: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
    '''Squared Log Error objective. A simplified version for RMSLE used as
    objective function.
    '''
    predt[predt < -1] = -1 + 1e-6
    grad = gradient(predt, dtrain)
    hess = hessian(predt, dtrain)
    return grad, hess

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler

# Zakodowanie kolumn i zbudowanie pipeline
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), ['age', 'fare', 'sibsp', 'parch']),
        ('cat', OneHotEncoder(), ['gender', 'class', 'embarked'])
    ]
)

pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', xgb.XGBClassifier(objective='binary:logistic'))
])

# Dopasowanie modelu
pipeline.fit(X_train, y_train)

# Prognozowanie
preds = pipeline.predict(X_test)


In [308]:
label = pd.DataFrame(y)
dtrain = xgb.DMatrix(X_transformed, label=label)

In [315]:
import numpy as np
import xgboost as xgb
from typing import Tuple

def gradient(predt: np.ndarray, dtrain: xgb.DMatrix) -> np.ndarray:
    '''Compute the gradient squared log error.'''
    y = dtrain.get_label()
    return (np.log1p(predt) - np.log1p(y)) / (predt + 1)

def hessian(predt: np.ndarray, dtrain: xgb.DMatrix) -> np.ndarray:
    '''Compute the hessian for squared log error.'''
    y = dtrain.get_label()
    return ((-np.log1p(predt) + np.log1p(y) + 1) /
            np.power(predt + 1, 2))

def squared_log(predt: np.ndarray,
                dtrain: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
    '''Squared Log Error objective. A simplified version for RMSLE used as
    objective function.
    '''
    predt[predt < -1] = -1 + 1e-6
    grad = gradient(predt, dtrain)
    hess = hessian(predt, dtrain)
    return grad, hess

In [140]:
X_train, X_test, y_train, y_test = train_test_split(X_transformed, y, test_size=.2, random_state=123)

In [None]:
xgbc = xgb.XGBClassifier(objective=squared_log)
xgbc.fit(X_train, y_train)

# print(xgbc.score(X_test, y_test))
# print(xgbc.score(X_test, y_test))
# print(xgbc.score(X_train, y_train))

xgbc.train({'learning_rate': 0.1, 'max_depth': 2, 'objective': 'binary:logistic'}, dtrain, num_boost_round=10)



In [None]:
squared_log = xgb.train({'tree_method': 'hist', 'seed': 1994},  # any other tree method is fine.
           dtrain=dtrain,
           num_boost_round=100
           )

xgbc = xgb.XGBClassifier(booster=squared_log)
xgbc.fit(X_train, y_train)
# print(xgbc.score(X_test, y_test))
# print(xgbc.score(X_train, y_train))

In [331]:
def softprob_obj(labels: np.ndarray, predt: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    rows = labels.shape[0]
    classes = predt.shape[1]
    grad = np.zeros((rows, classes), dtype=float)
    hess = np.zeros((rows, classes), dtype=float)
    eps = 1e-6
    for r in range(predt.shape[0]):
        target = labels[r]
        p = softmax(predt[r, :])
        for c in range(predt.shape[1]):
            g = p[c] - 1.0 if c == target else p[c]
            h = max((2.0 * p[c] * (1.0 - p[c])).item(), eps)
            grad[r, c] = g
            hess[r, c] = h

    grad = grad.reshape((rows * classes, 1))
    hess = hess.reshape((rows * classes, 1))
    return grad, hess

clf = xgb.XGBClassifier(tree_method="hist", objective=softprob_obj)

In [None]:
from sklearn.datasets import load_diabetes
from sklearn.metrics import mean_absolute_error
X, y = load_diabetes(return_X_y=True)
reg = xgb.XGBRegressor(
    tree_method="hist",
    eval_metric=mean_absolute_error,
    objective=softprob_obj
)
reg.fit(X, y, eval_set=[(X, y)])

In [None]:
xgbc = xgb.XGBClassifier(objective=)

In [None]:
numerical_features = list(X.dtypes[(X.dtypes != 'object') & (X.dtypes != 'category')].index)
categorical_features = list(X.dtypes[(X.dtypes == 'object') | (X.dtypes == 'category')].index)

numerical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy = 'mean')),
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy = 'most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown = 'ignore'))
])

preprocessor = ColumnTransformer([
    ('categorical', categorical_transformer, categorical_features),
    ('numerical', numerical_transformer, numerical_features)
])

pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', XGBClassifier(n_estimators=2, max_depth=2, learning_rate=1, objective='binary:logistic'))
])

pipeline.fit(X_train, y_train)

In [None]:
preds = pipeline.predict(X_test)

print(pipeline.score(X_test, y_test))
print(pipeline.score(X_train, y_train))

In [145]:
preprocess = make_column_transformer(
    (StandardScaler(), ['age', 'fare', 'parch', 'sibsp']),
    (OneHotEncoder(), ['gender', 'class', 'embarked']))

titanic_rf = make_pipeline(
    preprocess,
    RandomForestClassifier(max_depth = 3, n_estimators = 500))
titanic_rf.fit(X, y)

titanic_lr = make_pipeline(
    preprocess,
    LogisticRegression(penalty = 'l2'))
titanic_lr.fit(X, y)

titanic_rf = make_pipeline(
    preprocess,
    RandomForestClassifier(max_depth = 3, n_estimators = 500))
titanic_rf.fit(X, y)

titanic_gbc = make_pipeline(
    preprocess,
    GradientBoostingClassifier(n_estimators = 100))
titanic_gbc.fit(X, y)


titanic_svm = make_pipeline(
    preprocess,
    SVC(probability = True))
titanic_svm.fit(X, y)



henry = pd.DataFrame({'gender'   : ['male'],
                       'age'     : [47],
                       'class'   : ['1st'],
                       'embarked': ['Cherbourg'],
                       'fare'    : [25],
                       'sibsp'   : [0],
                       'parch'   : [0]},
                      index = ['Henry'])

In [347]:
import numpy as np
import xgboost as xgb
from typing import Tuple

def gradient(predt: np.ndarray, dtrain: xgb.DMatrix) -> np.ndarray:
    '''Compute the gradient squared log error.'''
    y = dtrain.get_label()
    return (np.log1p(predt) - np.log1p(y)) / (predt + 1)

def hessian(predt: np.ndarray, dtrain: xgb.DMatrix) -> np.ndarray:
    '''Compute the hessian for squared log error.'''
    y = dtrain.get_label()
    return ((-np.log1p(predt) + np.log1p(y) + 1) /
            np.power(predt + 1, 2))

def squared_log(predt: np.ndarray,
                dtrain: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
    '''Squared Log Error objective. A simplified version for RMSLE used as
    objective function.
    '''
    predt[predt < -1] = -1 + 1e-6
    grad = gradient(predt, dtrain)
    hess = hessian(predt, dtrain)
    return grad, hess

In [348]:
dtrain = xgb.DMatrix(X_train, label=y_train)

In [None]:
xgbc = xgb.XGBClassifier(objective=squared_log, dtrain=dtrain)
# xgbc.fit(X_train, y_train)

In [None]:
import sympy as sp

y, y_pred = sp.symbols('y y_pred')
f = sp.Function('f')(y, y_pred)

# Create a loss function
f = (y - y_pred)**2

grad = f.gra
# hess = grad.diff(y_pred)

grad

In [None]:
from sympy import Function, hessian, pprint
from sympy.abc import x, y
f = Function('f')(x, y)
g1 = Function('g')(x, y)
g2 = x**2 + 3*y + 6*x**23 * y**3

pprint(hessian(g2, (x, y)))

In [None]:
sympy

In [227]:
import numpy as np
import xgboost as xgb
from typing import Tuple

def gradient_mse(predt: np.ndarray, dtrain: xgb.DMatrix) -> np.ndarray:
    '''Compute the gradient mean squared error.'''
    y = dtrain.get_label()
    return 2 * (predt - y)

def hessian_mse(predt: np.ndarray, dtrain: xgb.DMatrix, lambda_reg: int) -> np.ndarray:
    '''Compute the hessian for mean squared error.'''
    y = dtrain.get_label()
    return 2 * np.ones_like(y) + lambda_reg

def mean_squared_error(predt: np.ndarray,
                dtrain: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
    '''Mean squared error objective'''
    grad = gradient_mse(predt, dtrain)
    hess = hessian_mse(predt, dtrain, 1000)
    return grad, hess

In [None]:
label = pd.DataFrame(y)
dtrain = xgb.DMatrix(X_transformed, label=label)

gradient_mse(predt = y, dtrain = dtrain)

In [213]:
def rmsle(predt: np.ndarray, dtrain: xgb.DMatrix) -> Tuple[str, float]:
    ''' Root mean squared log error metric.'''
    y = dtrain.get_label()
    predt[predt < -1] = -1 + 1e-6
    elements = np.power(np.log1p(y) - np.log1p(predt), 2)
    return 'MSE', float(np.sqrt(np.sum(elements) / len(y)))

In [None]:
xgb.train({'tree_method': 'hist', 'seed': 1994,
           'gamma': 1},
           dtrain=dtrain,
           num_boost_round=20,
           obj=mean_squared_error,
        #    custom_metric=rmsle,
           evals=[(dtrain, 'train'), (dtrain, 'test')]
           )


In [148]:
import numpy as np
from scipy.special import loggamma, psi as digamma, polygamma
trigamma = lambda x: polygamma(1, x)

def dirichlet_fun(pred: np.ndarray, Y: np.ndarray) -> float:
    epred = np.exp(pred)
    sum_epred = np.sum(epred, axis=1, keepdims=True)
    return (
        loggamma(epred).sum()
        - loggamma(sum_epred).sum()
        - np.sum(np.log(Y) * (epred - 1))
    )
def dirichlet_grad(pred: np.ndarray, Y: np.ndarray) -> np.ndarray:
    epred = np.exp(pred)
    return epred * (
        digamma(epred)
        - digamma(np.sum(epred, axis=1, keepdims=True))
        - np.log(Y)
    )
def dirichlet_hess(pred: np.ndarray, Y: np.ndarray) -> np.ndarray:
    epred = np.exp(pred)
    grad = dirichlet_grad(pred, Y)
    k = Y.shape[1]
    H = np.empty((pred.shape[0], k, k))
    for row in range(pred.shape[0]):
        H[row, :, :] = (
            - trigamma(epred[row].sum()) * np.outer(epred[row], epred[row])
            + np.diag(grad[row] + trigamma(epred[row]) * epred[row] ** 2)
        )
    return H

In [149]:
from math import isclose
from scipy import stats
from scipy.optimize import check_grad
from scipy.special import softmax

def gen_random_dirichlet(rng: np.random.Generator, m: int, k: int):
    alpha = np.exp(rng.standard_normal(size=k))
    return rng.dirichlet(alpha, size=m)

def test_dirichlet_fun_grad_hess():
    k = 3
    m = 10
    rng = np.random.default_rng(seed=123)
    Y = gen_random_dirichlet(rng, m, k)
    x0 = rng.standard_normal(size=k)
    for row in range(Y.shape[0]):
        fun_row = dirichlet_fun(x0.reshape((1,-1)), Y[[row]])
        ref_logpdf = stats.dirichlet.logpdf(
            Y[row] / Y[row].sum(), # <- avoid roundoff error
            np.exp(x0),
        )
        assert isclose(fun_row, -ref_logpdf)

        gdiff = check_grad(
            lambda pred: dirichlet_fun(pred.reshape((1,-1)), Y[[row]]),
            lambda pred: dirichlet_grad(pred.reshape((1,-1)), Y[[row]]),
            x0
        )
        assert gdiff <= 1e-6

        H_numeric = np.empty((k,k))
        eps = 1e-7
        for ii in range(k):
            x0_plus_eps = x0.reshape((1,-1)).copy()
            x0_plus_eps[0,ii] += eps
            for jj in range(k):
                H_numeric[ii, jj] = (
                    dirichlet_grad(x0_plus_eps, Y[[row]])[0][jj]
                    - dirichlet_grad(x0.reshape((1,-1)), Y[[row]])[0][jj]
                ) / eps
        H = dirichlet_hess(x0.reshape((1,-1)), Y[[row]])[0]
        np.testing.assert_almost_equal(H, H_numeric, decimal=6)
test_dirichlet_fun_grad_hess()

In [150]:
def dirichlet_expected_hess(pred: np.ndarray) -> np.ndarray:
    epred = np.exp(pred)
    k = pred.shape[1]
    Ehess = np.empty((pred.shape[0], k, k))
    for row in range(pred.shape[0]):
        Ehess[row, :, :] = (
            - trigamma(epred[row].sum()) * np.outer(epred[row], epred[row])
            + np.diag(trigamma(epred[row]) * epred[row] ** 2)
        )
    return Ehess
def test_dirichlet_expected_hess():
    k = 3
    rng = np.random.default_rng(seed=123)
    x0 = rng.standard_normal(size=k)
    y_sample = rng.dirichlet(np.exp(x0), size=5_000_000)
    x_broadcast = np.broadcast_to(x0, (y_sample.shape[0], k))
    g_sample = dirichlet_grad(x_broadcast, y_sample)
    ref = (g_sample.T @ g_sample) / y_sample.shape[0]
    Ehess = dirichlet_expected_hess(x0.reshape((1,-1)))[0]
    np.testing.assert_almost_equal(Ehess, ref, decimal=2)
test_dirichlet_expected_hess()

In [151]:
def dirichlet_diag_upper_bound_expected_hess(
    pred: np.ndarray, Y: np.ndarray
) -> np.ndarray:
    Ehess = dirichlet_expected_hess(pred)
    diag_bound_Ehess = np.empty((pred.shape[0], Y.shape[1]))
    for row in range(pred.shape[0]):
        diag_bound_Ehess[row, :] = np.abs(Ehess[row, :, :]).sum(axis=1)
    return diag_bound_Ehess

In [152]:
import xgboost as xgb
from typing import Tuple

def dirichlet_xgb_objective(
    pred: np.ndarray, dtrain: xgb.DMatrix
) -> Tuple[np.ndarray, np.ndarray]:
    Y = dtrain.get_label().reshape(pred.shape)
    return (
        dirichlet_grad(pred, Y),
        dirichlet_diag_upper_bound_expected_hess(pred, Y),
    )


In [153]:
def dirichlet_eval_metric(
    pred: np.ndarray, dtrain: xgb.DMatrix
) -> Tuple[str, float]:
    Y = dtrain.get_label().reshape(pred.shape)
    return "dirichlet_ll", dirichlet_fun(pred, Y)

In [154]:
X = np.array([
    10.4,11.7,12.8,13,15.7,16.3,18,18.7,20.7,22.1,
    22.4,24.4,25.8,32.5,33.6,36.8,37.8,36.9,42.2,47,
    47.1,48.4,49.4,49.5,59.2,60.1,61.7,62.4,69.3,73.6,
    74.4,78.5,82.9,87.7,88.1,90.4,90.6,97.7,103.7,
]).reshape((-1,1))
# sand, silt, clay
Y = np.array([
    [0.775,0.195,0.03], [0.719,0.249,0.032], [0.507,0.361,0.132],
    [0.522,0.409,0.066], [0.7,0.265,0.035], [0.665,0.322,0.013],
    [0.431,0.553,0.016], [0.534,0.368,0.098], [0.155,0.544,0.301],
    [0.317,0.415,0.268], [0.657,0.278,0.065], [0.704,0.29,0.006],
    [0.174,0.536,0.29], [0.106,0.698,0.196], [0.382,0.431,0.187],
    [0.108,0.527,0.365], [0.184,0.507,0.309], [0.046,0.474,0.48],
    [0.156,0.504,0.34], [0.319,0.451,0.23], [0.095,0.535,0.37],
    [0.171,0.48,0.349], [0.105,0.554,0.341], [0.048,0.547,0.41],
    [0.026,0.452,0.522], [0.114,0.527,0.359], [0.067,0.469,0.464],
    [0.069,0.497,0.434], [0.04,0.449,0.511], [0.074,0.516,0.409],
    [0.048,0.495,0.457], [0.045,0.485,0.47], [0.066,0.521,0.413],
    [0.067,0.473,0.459], [0.074,0.456,0.469], [0.06,0.489,0.451],
    [0.063,0.538,0.399], [0.025,0.48,0.495], [0.02,0.478,0.502],
])

In [None]:
Y

In [None]:
from typing import Dict, List

dtrain = xgb.DMatrix(X, label=Y)
results: Dict[str, Dict[str, List[float]]] = {}
booster = xgb.train(
    params={
        "tree_method": "hist",
        "num_target": Y.shape[1],
        "base_score": 0,
        "disable_default_eval_metric": True,
        "max_depth": 3,
        "seed": 123,
    },
    dtrain=dtrain,
    num_boost_round=10,
    obj=dirichlet_xgb_objective,
    evals=[(dtrain, "Train")],
    evals_result=results,
    custom_metric=dirichlet_eval_metric,
)
yhat = softmax(booster.inplace_predict(X), axis=1)

In [157]:
from scipy.optimize import minimize

def get_optimal_intercepts(Y: np.ndarray) -> np.ndarray:
    k = Y.shape[1]
    res = minimize(
        fun=lambda pred: dirichlet_fun(
            np.broadcast_to(pred, (Y.shape[0], k)),
            Y
        ),
        x0=np.zeros(k),
        jac=lambda pred: dirichlet_grad(
            np.broadcast_to(pred, (Y.shape[0], k)),
            Y
        ).sum(axis=0)
    )
    return res["x"]
intercepts = get_optimal_intercepts(Y)

In [None]:
base_margin = np.broadcast_to(intercepts, Y.shape)
dtrain_w_intercept = xgb.DMatrix(X, label=Y, base_margin=base_margin)
results: Dict[str, Dict[str, List[float]]] = {}
booster = xgb.train(
    params={
        "tree_method": "hist",
        "num_target": Y.shape[1],
        "base_score": 0,
        "disable_default_eval_metric": True,
        "max_depth": 3,
        "seed": 123,
    },
    dtrain=dtrain_w_intercept,
    num_boost_round=10,
    obj=dirichlet_xgb_objective,
    evals=[(dtrain, "Train")],
    evals_result=results,
    custom_metric=dirichlet_eval_metric,
)
yhat = softmax(
    booster.predict(
        xgb.DMatrix(X, base_margin=base_margin)
    ),
    axis=1
)