Skip to content

Commit

Permalink
Merge branch 'solidago-pipeline-docs-1' of github.com:tournesol-app/t…
Browse files Browse the repository at this point in the history
…ournesol into solidago-pipeline-docs-1
  • Loading branch information
lenhoanglnh committed Jun 1, 2024
2 parents 498f4a3 + 3483609 commit fde2a83
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 285 deletions.
32 changes: 18 additions & 14 deletions solidago/src/solidago/preference_learning/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional, Union
from typing import Optional

import pandas as pd
import logging
Expand All @@ -10,21 +10,25 @@

logger = logging.getLogger(__name__)


class PreferenceLearning(ABC):
MAX_UNCERTAINTY = 1000.0

def __call__(
self,
self,
judgments: Judgments,
users: pd.DataFrame,
entities: pd.DataFrame,
initialization: Optional[dict[int, ScoringModel]] = None,
new_judgments: Optional[Judgments] = None,
) -> dict[int, ScoringModel]:
""" Learns a scoring model, given user judgments of entities
"""Learns a scoring model, given user judgments of entities
Parameters
----------
judgments:
May contain different forms of judgments,
May contain different forms of judgments,
but most likely will contain "comparisons" and/or "assessments"
entities: DataFrame with columns
* entity_id: int, index
Expand All @@ -42,7 +46,7 @@ def __call__(
user_models[user] is the learned scoring model for user
"""
assert isinstance(judgments, Judgments)

user_models = dict() if initialization is None else initialization
for n_user, user in enumerate(users.index):
if n_user % 100 == 0:
Expand All @@ -55,21 +59,21 @@ def __call__(
new_judg = None if new_judgments is None else new_judgments[user]
user_models[user] = self.user_learn(judgments[user], entities, init_model, new_judg)
return user_models

@abstractmethod
def user_learn(
self,
self,
user_judgments: dict[str, pd.DataFrame],
entities: pd.DataFrame,
initialization: Optional[ScoringModel]=None,
new_judgments: Optional[dict[str, pd.DataFrame]]=None,
initialization: Optional[ScoringModel] = None,
new_judgments: Optional[dict[str, pd.DataFrame]] = None,
) -> ScoringModel:
""" Learns a scoring model, given user judgments of entities
"""Learns a scoring model, given user judgments of entities
Parameters
----------
user_judgments: dict[str, pd.DataFrame]
May contain different forms of judgments,
May contain different forms of judgments,
but most likely will contain "comparisons" and/or "assessments"
entities: DataFrame with columns
* entity_id: int, index
Expand All @@ -86,9 +90,9 @@ def user_learn(
model: ScoringModel
"""
raise NotImplementedError

def to_json(self):
return (type(self).__name__, )
return (type(self).__name__,)

def __str__(self):
return type(self).__name__
166 changes: 94 additions & 72 deletions solidago/src/solidago/preference_learning/generalized_bradley_terry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(
self,
prior_std_dev: float=7,
convergence_error: float=1e-5,
high_likelihood_range_threshold = 1.0,
):
"""
Expand All @@ -30,8 +31,10 @@ def __init__(
"""
self.prior_std_dev = prior_std_dev
self.convergence_error = convergence_error

self.high_likelihood_range_threshold = high_likelihood_range_threshold

@property
@abstractmethod
def cumulant_generating_function_derivative(self) -> Callable[[npt.NDArray], npt.NDArray]:
""" The beauty of the generalized Bradley-Terry model is that it suffices
to specify its cumulant generating function derivative to fully define it,
Expand All @@ -46,23 +49,33 @@ def cumulant_generating_function_derivative(self) -> Callable[[npt.NDArray], npt
-------
out: float
"""
raise NotImplementedError


@property
@abstractmethod
def cumulant_generating_function_second_derivative(self, score_diff: float) -> float:
""" We estimate uncertainty by the flatness of the negative log likelihood,
which is directly given by the second derivative of the cumulant generating function.
Parameters
----------
score_diff: float
Score difference
Returns
-------
out: float
def log_likelihood_function(self) -> Callable[[npt.NDArray, npt.NDArray], float]:
"""The loss function definition is used only to compute uncertainties.
"""
pass

@cached_property
def translated_negative_log_likelihood(self):
"""This function is a convex negative log likelihood, translated such
that its minimum has a constant negative value at `delta=0`. The
roots of this function are used to compute the uncertainties
intervals. If it has only a single root, then uncertainty on the
other side is considered infinite.
"""
ll_function = self.log_likelihood_function
high_likelihood_range_threshold = self.high_likelihood_range_threshold

@njit
def f(delta, theta_diff, r, coord_indicator, ll_actual):
return (
ll_function(theta_diff + delta * coord_indicator, r)
- ll_actual
- high_likelihood_range_threshold
)

return f

@cached_property
def update_coordinate_function(self):
Expand Down Expand Up @@ -101,16 +114,16 @@ def comparison_learning(
"""
entities = list(set(comparisons["entity_a"]) | set(comparisons["entity_b"]))
entity_coordinates = { entity: c for c, entity in enumerate(entities) }

comparisons_dict = self.comparisons_dict(comparisons, entity_coordinates)

init_solution = np.zeros(len(entities))
if initialization is not None:
for (entity_id, entity_coord) in entity_coordinates.items():
entity_init_values = initialization(entity_id)
if entity_init_values is not None:
init_solution[entity_coord] = entity_init_values[0]

updated_coordinates = list() if updated_entities is None else [
entity_coordinates[entity] for entity in updated_entities
]
Expand All @@ -121,37 +134,62 @@ def get_derivative_args(coord: int, sol: np.ndarray):
sol[indices],
comparisons_bis
)

solution = coordinate_descent(
self.update_coordinate_function,
get_args=get_derivative_args,
initialization=init_solution,
updated_coordinates=updated_coordinates,
error=self.convergence_error,
)

uncertainties = [
self.hessian_diagonal_element(coordinate, solution, comparisons_dict[coordinate][0])
for coordinate in range(len(entities))
]

model = DirectScoringModel()

comparisons = comparisons.assign(
entity_a_coord=comparisons["entity_a"].map(entity_coordinates),
entity_b_coord=comparisons['entity_b'].map(entity_coordinates),
)
score_diff = solution[comparisons["entity_a_coord"]] - solution[comparisons["entity_b_coord"]]
r_actual = (comparisons["comparison"] / comparisons["comparison_max"]).to_numpy()

uncertainties_left = np.empty_like(solution)
uncertainties_right = np.empty_like(solution)
ll_actual = self.log_likelihood_function(score_diff, r_actual)

for coordinate in range(len(solution)):
model[entities[coordinate]] = solution[coordinate], uncertainties[coordinate]

comparison_indicator = (
(comparisons["entity_a_coord"] == coordinate).astype(int)
- (comparisons["entity_b_coord"] == coordinate).astype(int)
).to_numpy()
try:
uncertainties_left[coordinate] = -1 * njit_brentq(
self.translated_negative_log_likelihood,
args=(score_diff, r_actual, comparison_indicator, ll_actual),
xtol=1e-2,
a=-self.MAX_UNCERTAINTY,
b=0.0,
extend_bounds="no",
)
except ValueError:
uncertainties_left[coordinate] = self.MAX_UNCERTAINTY

try:
uncertainties_right[coordinate] = njit_brentq(
self.translated_negative_log_likelihood,
args=(score_diff, r_actual, comparison_indicator, ll_actual),
xtol=1e-2,
a=0.0,
b=self.MAX_UNCERTAINTY,
extend_bounds="no",
)
except ValueError:
uncertainties_right[coordinate] = self.MAX_UNCERTAINTY

model = DirectScoringModel()
for coord in range(len(solution)):
model[entities[coord]] = solution[coord], uncertainties_left[coord], uncertainties_right[coord]
return model

def comparisons_dict(self, comparisons, entity_coordinates) -> dict[int, tuple[npt.NDArray, npt.NDArray]]:
comparisons = (
comparisons[
["entity_a","entity_b","comparison", "comparison_max"]
]
.assign(
pair=comparisons.apply(lambda c: {c["entity_a"], c["entity_b"]}, axis=1)
)
.drop_duplicates("pair", keep="last")
.drop(columns="pair")
)
comparisons = comparisons[["entity_a","entity_b","comparison", "comparison_max"]]
comparisons_sym = pd.concat(
[
comparisons,
Expand All @@ -176,7 +214,7 @@ def comparisons_dict(self, comparisons, entity_coordinates) -> dict[int, tuple[n
coord: (group["entity_b"].to_numpy(), group["comparison"].to_numpy())
for (coord, group) in comparisons_sym.groupby("entity_a")
} # type: ignore

@cached_property
def partial_derivative(self):
""" Computes the partial derivative along a coordinate,
Expand All @@ -203,23 +241,9 @@ def njit_partial_derivative(
)
)
return njit_partial_derivative

def hessian_diagonal_element(
self,
coordinate: int,
solution: np.ndarray,
comparisons_indices: np.ndarray,
) -> float:
""" Computes the second partial derivative """
result = 1 / self.prior_std_dev ** 2
for coordinate_bis in comparisons_indices:
score_diff = solution[coordinate] - solution[coordinate_bis]
result += self.cumulant_generating_function_second_derivative(score_diff)
return result


class UniformGBT(GeneralizedBradleyTerry):

def __init__(
self,
prior_std_dev: float = 7,
Expand All @@ -234,6 +258,21 @@ def __init__(
super().__init__(prior_std_dev, convergence_error)
self.cumulant_generating_function_error = cumulant_generating_function_error

@cached_property
def log_likelihood_function(self):
@njit
def f(score_diff, r):
score_diff_abs = np.abs(score_diff)
return (
np.where(
score_diff_abs < 20.0,
np.log(np.sinh(score_diff) / score_diff),
score_diff_abs - np.log(2) - np.log(score_diff_abs)
)
+ r * score_diff
).sum()
return f

@cached_property
def cumulant_generating_function_derivative(self) -> Callable[[npt.NDArray], npt.NDArray]:
tolerance = self.cumulant_generating_function_error
Expand All @@ -248,23 +287,6 @@ def f(score_diff: npt.NDArray):

return f

def cumulant_generating_function_second_derivative(self, score_diff: float) -> float:
"""We estimate uncertainty by the flatness of the negative log likelihood,
which is directly given by the second derivative of the cumulant generating function.
Parameters
----------
score_diff: float
Score difference
Returns
-------
out: float
"""
if np.abs(score_diff) < self.cumulant_generating_function_error:
return (1 / 3) - (score_diff**2 / 15)
return 1 - (1 / np.tanh(score_diff) ** 2) + (1 / score_diff**2)

def to_json(self):
return type(self).__name__, dict(
prior_std_dev=self.prior_std_dev,
Expand Down
Loading

0 comments on commit fde2a83

Please sign in to comment.