Skip to content

Commit

Permalink
More general predict proba. (dmlc#6817)
Browse files Browse the repository at this point in the history
* Use `output_margin` for `softmax`.
* Add test for dask binary cls.

Co-authored-by: Philip Hyunsu Cho <chohyu01@cs.washington.edu>
  • Loading branch information
trivialfis and hcho3 committed Apr 6, 2021
1 parent 357a78b commit 73f3111
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 40 deletions.
25 changes: 17 additions & 8 deletions python-package/xgboost/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from collections import defaultdict
from collections.abc import Sequence
from threading import Thread
from functools import partial, update_wrapper
from typing import TYPE_CHECKING, List, Tuple, Callable, Optional, Any, Union, Dict, Set
from typing import Awaitable, Generator, TypeVar

Expand Down Expand Up @@ -967,7 +968,7 @@ def _can_output_df(is_df: bool, output_shape: Tuple) -> bool:
return is_df and len(output_shape) <= 2


async def _direct_predict_impl(
async def _direct_predict_impl( # pylint: disable=too-many-branches
mapped_predict: Callable,
booster: "distributed.Future",
data: _DaskCollection,
Expand Down Expand Up @@ -1022,13 +1023,23 @@ async def _direct_predict_impl(
new_axis = list(range(len(output_shape) - 2))
else:
new_axis = [i + 2 for i in range(len(output_shape) - 2)]
if len(output_shape) == 2:
# Somehow dask fail to infer output shape change for 2-dim prediction, and
# `chunks = (None, output_shape[1])` doesn't work due to None is not
# supported in map_blocks.
chunks = list(data.chunks)
chunks[1] = (output_shape[1], )
else:
chunks = None
predictions = da.map_blocks(
mapped_predict,
booster,
data,
False,
columns,
base_margin_array,

chunks=chunks,
drop_axis=drop_axis,
new_axis=new_axis,
dtype=numpy.float32,
Expand Down Expand Up @@ -1776,28 +1787,27 @@ async def _predict_proba_async(
self,
X: _DaskCollection,
validate_features: bool,
output_margin: bool,
base_margin: Optional[_DaskCollection],
iteration_range: Optional[Tuple[int, int]],
) -> _DaskCollection:
if iteration_range is None:
iteration_range = (0, 0)
predts = await super()._predict_async(
data=X,
output_margin=output_margin,
output_margin=self.objective == "multi:softmax",
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range,
)
return _cls_predict_proba(self.objective, predts, da.vstack)
vstack = update_wrapper(
partial(da.vstack, allow_unknown_chunksizes=True), da.vstack
)
return _cls_predict_proba(getattr(self, "n_classes_", None), predts, vstack)

# pylint: disable=missing-function-docstring
def predict_proba(
self,
X: _DaskCollection,
ntree_limit: Optional[int] = None,
validate_features: bool = True,
output_margin: bool = False,
base_margin: Optional[_DaskCollection] = None,
iteration_range: Optional[Tuple[int, int]] = None,
) -> Any:
Expand All @@ -1808,7 +1818,6 @@ def predict_proba(
self._predict_proba_async,
X=X,
validate_features=validate_features,
output_margin=output_margin,
base_margin=base_margin,
iteration_range=iteration_range,
)
Expand Down
37 changes: 24 additions & 13 deletions python-package/xgboost/sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import copy
import warnings
import json
from typing import Union, Optional, List, Dict, Callable, Tuple, Any
from typing import Union, Optional, List, Dict, Callable, Tuple, Any, TypeVar
import numpy as np
from .core import Booster, DMatrix, XGBoostError
from .core import _deprecate_positional_args, _convert_ntree_limit
Expand Down Expand Up @@ -561,6 +561,8 @@ def load_model(self, fname):
self._Booster.load_model(fname)
meta = self._Booster.attr('scikit_learn')
if meta is None:
# FIXME(jiaming): This doesn't have to be a problem as most of the needed
# information like num_class and objective is in Learner class.
warnings.warn(
'Loading a native XGBoost model with Scikit-Learn interface.')
return
Expand All @@ -571,6 +573,8 @@ def load_model(self, fname):
self._le = XGBoostLabelEncoder()
self._le.from_json(v)
continue
# FIXME(jiaming): This can be removed once label encoder is gone since we can
# generate it from `np.arange(self.n_classes_)`
if k == 'classes_':
self.classes_ = np.array(v)
continue
Expand Down Expand Up @@ -1024,17 +1028,14 @@ def intercept_(self):
return np.array(json.loads(b.get_dump(dump_format='json')[0])['bias'])


def _cls_predict_proba(
objective: Union[str, Callable], prediction: Any, vstack: Callable
) -> Any:
if objective == 'multi:softmax':
raise ValueError('multi:softmax objective does not support predict_proba,'
' use `multi:softprob` or `binary:logistic` instead.')
if objective == 'multi:softprob' or callable(objective):
# Return prediction directly if if objective is defined by user since we don't
# know how to perform the transformation
PredtT = TypeVar("PredtT")


def _cls_predict_proba(n_classes: int, prediction: PredtT, vstack: Callable) -> PredtT:
assert len(prediction.shape) <= 2
if len(prediction.shape) == 2 and prediction.shape[1] == n_classes:
return prediction
# Lastly the binary logistic function
# binary logistic function
classone_probs = prediction
classzero_probs = 1.0 - classone_probs
return vstack((classzero_probs, classone_probs)).transpose()
Expand Down Expand Up @@ -1218,8 +1219,10 @@ def predict(
return class_probs

if len(class_probs.shape) > 1:
# turns softprob into softmax
column_indexes = np.argmax(class_probs, axis=1)
else:
# turns soft logit into class label
column_indexes = np.repeat(0, class_probs.shape[0])
column_indexes[class_probs > 0.5] = 1

Expand Down Expand Up @@ -1262,15 +1265,23 @@ def predict_proba(
a numpy array of shape array-like of shape (n_samples, n_classes) with the
probability of each data example being of a given class.
"""
# custom obj: Do nothing as we don't know what to do.
# softprob: Do nothing, output is proba.
# softmax: Use output margin to remove the argmax in PredTransform.
# binary:logistic: Expand the prob vector into 2-class matrix after predict.
# binary:logitraw: Unsupported by predict_proba()
class_probs = super().predict(
X=X,
output_margin=False,
output_margin=self.objective == "multi:softmax",
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range
)
return _cls_predict_proba(self.objective, class_probs, np.vstack)
# If model is loaded from a raw booster there's no `n_classes_`
return _cls_predict_proba(
getattr(self, "n_classes_", None), class_probs, np.vstack
)

def evals_result(self):
"""Return the evaluation results.
Expand Down
5 changes: 4 additions & 1 deletion tests/python-gpu/test_gpu_with_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def run_gpu_hist(
assert tm.non_increasing(history["train"][dataset.metric])


@pytest.mark.skipif(**tm.no_cudf())
def test_boost_from_prediction(local_cuda_cluster: LocalCUDACluster) -> None:
from sklearn.datasets import load_breast_cancer
with Client(local_cuda_cluster) as client:
Expand Down Expand Up @@ -201,6 +202,7 @@ def test_dask_dataframe(self, local_cuda_cluster: LocalCUDACluster) -> None:
@settings(deadline=duration(seconds=120), suppress_health_check=suppress)
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.skipif(**tm.no_cupy())
@pytest.mark.parametrize(
"local_cuda_cluster", [{"n_workers": 2}], indirect=["local_cuda_cluster"]
)
Expand Down Expand Up @@ -275,7 +277,7 @@ def test_dask_classifier(self, model, local_cuda_cluster: LocalCUDACluster) -> N
X = dask_cudf.from_dask_dataframe(dd.from_dask_array(X_))
y = dask_cudf.from_dask_dataframe(dd.from_dask_array(y_))
w = dask_cudf.from_dask_dataframe(dd.from_dask_array(w_))
run_dask_classifier(X, y, w, model, client)
run_dask_classifier(X, y, w, model, client, 10)

@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
Expand Down Expand Up @@ -453,6 +455,7 @@ async def run_from_dask_array_asyncio(scheduler_address: str) -> dxgb.TrainRetur

@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.skipif(**tm.no_cupy())
@pytest.mark.mgpu
def test_with_asyncio(local_cuda_cluster: LocalCUDACluster) -> None:
with Client(local_cuda_cluster) as client:
Expand Down
45 changes: 27 additions & 18 deletions tests/python/test_with_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,17 @@ def run_dask_classifier(
w: xgb.dask._DaskCollection,
model: str,
client: "Client",
n_classes,
) -> None:
metric = "merror" if n_classes > 2 else "logloss"

if model == "boosting":
classifier = xgb.dask.DaskXGBClassifier(
verbosity=1, n_estimators=2, eval_metric="merror"
verbosity=1, n_estimators=2, eval_metric=metric
)
else:
classifier = xgb.dask.DaskXGBRFClassifier(
verbosity=1, n_estimators=2, eval_metric="merror"
verbosity=1, n_estimators=2, eval_metric=metric
)

assert classifier._estimator_type == "classifier"
Expand All @@ -343,42 +346,43 @@ def run_dask_classifier(
assert isinstance(history, dict)

assert list(history.keys())[0] == "validation_0"
assert list(history["validation_0"].keys())[0] == "merror"
assert list(history["validation_0"].keys())[0] == metric
assert len(list(history["validation_0"])) == 1
forest = int(
json.loads(classifier.get_booster().save_config())["learner"][
"gradient_booster"
]["gbtree_train_param"]["num_parallel_tree"]
)
if model == "boosting":
assert len(history["validation_0"]["merror"]) == 2
assert len(history["validation_0"][metric]) == 2
assert forest == 1
else:
assert len(history["validation_0"]["merror"]) == 1
assert len(history["validation_0"][metric]) == 1
assert forest == 2

# Test .predict_proba()
probas = classifier.predict_proba(X).compute()
assert classifier.n_classes_ == 10
assert classifier.n_classes_ == n_classes
assert probas.ndim == 2
assert probas.shape[0] == kRows
assert probas.shape[1] == 10
assert probas.shape[1] == n_classes

cls_booster = classifier.get_booster()
single_node_proba = cls_booster.inplace_predict(X.compute())
if n_classes > 2:
cls_booster = classifier.get_booster()
single_node_proba = cls_booster.inplace_predict(X.compute())

# test shared by CPU and GPU
if isinstance(single_node_proba, np.ndarray):
np.testing.assert_allclose(single_node_proba, probas)
else:
import cupy
cupy.testing.assert_allclose(single_node_proba, probas)
# test shared by CPU and GPU
if isinstance(single_node_proba, np.ndarray):
np.testing.assert_allclose(single_node_proba, probas)
else:
import cupy
cupy.testing.assert_allclose(single_node_proba, probas)

# Test with dataframe, not shared with GPU as cupy doesn't work well with da.unique.
if isinstance(X, da.Array):
if isinstance(X, da.Array) and n_classes > 2:
X_d: dd.DataFrame = X.to_dask_dataframe()

assert classifier.n_classes_ == 10
assert classifier.n_classes_ == n_classes
prediction_df = classifier.predict(X_d).compute()

assert prediction_df.ndim == 1
Expand All @@ -393,7 +397,12 @@ def run_dask_classifier(
def test_dask_classifier(model: str, client: "Client") -> None:
X, y, w = generate_array(with_weights=True)
y = (y * 10).astype(np.int32)
run_dask_classifier(X, y, w, model, client)
run_dask_classifier(X, y, w, model, client, 10)

y_bin = y.copy()
y_bin[y > 5] = 1.0
y_bin[y <= 5] = 0.0
run_dask_classifier(X, y_bin, w, model, client, 2)


@pytest.mark.skipif(**tm.no_sklearn())
Expand Down

0 comments on commit 73f3111

Please sign in to comment.