Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[solidago] Update docstrings and add simple API for Pipeline #1971

Merged
merged 11 commits into from
Jun 3, 2024
12 changes: 12 additions & 0 deletions backend/ml/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ContributorScaling,
Entity,
)
from vouch.models import Voucher


class MlInputFromDb(TournesolInput):
Expand Down Expand Up @@ -189,3 +190,14 @@ def get_individual_scores(

dtf = pd.DataFrame(values)
return dtf[["user_id", "entity", "criteria", "raw_score"]]

def get_vouches(self):
values = Voucher.objects.filter(
by__is_active=True,
to__is_active=True,
).values(
voucher="by__id",
vouchee="to__id",
vouch="value",
)
return pd.DataFrame(values)
10 changes: 6 additions & 4 deletions backend/tournesol/lib/public_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def write_comparisons_file(
"criteria",
"score",
"score_max",
"week_date"
"week_date",
]
writer = csv.DictWriter(write_target, fieldnames=fieldnames)
writer.writeheader()
Expand Down Expand Up @@ -413,7 +413,9 @@ def write_vouchers_file(write_target):
"to_username": voucher.to.username,
"value": voucher.value,
}
for voucher in Voucher.objects.filter(is_public=True)
.select_related("by", "to")
.order_by("by__username", "to__username")
for voucher in (
Voucher.objects.filter(is_public=True, by__is_active=True, to__is_active=True)
.select_related("by", "to")
.order_by("by__username", "to__username")
)
)
15 changes: 9 additions & 6 deletions solidago/experiments/tournesol.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
for entity_id, video_id in enumerate(inputs.entity_id_to_video_id)
}

logger.info("Preprocessing data for the pipeline")
users, vouches, all_entities, privacy = inputs.get_pipeline_objects()

# criteria = set(inputs.comparisons["criteria"])
criteria = { "largely_recommended" }
Expand Down Expand Up @@ -89,13 +87,18 @@

user_outputs, entities, voting_rights, scaled_user_models = dict(), dict(), dict(), dict()

users = pipeline.trust_propagation(users, vouches)

for c in criteria:
logger.info(f"Running the pipeline for criterion `{c}`")

judgments = inputs.get_judgments(c)


pipeline_objects = inputs.get_pipeline_kwargs(criterion=c)
users = pipeline_objects["users"]
vouches = pipeline_objects["vouches"]
all_entities = pipeline_objects["entities"]
privacy = pipeline_objects["privacy"]
judgments = pipeline_objects["judgments"]

users = pipeline.trust_propagation(users, vouches)
voting_rights[c], entities[c] = pipeline.voting_rights(users, all_entities, vouches, privacy)
user_models = pipeline.preference_learning(judgments, users, entities[c])
scaled_user_models[c] = pipeline.scaling(user_models, users, entities[c], voting_rights[c], privacy)
Expand Down
2 changes: 2 additions & 0 deletions solidago/src/solidago/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .inputs import TournesolInput
from .outputs import PipelineOutput
from .pipeline import DefaultPipeline, Pipeline

__all__ = ["TournesolInput", "DefaultPipeline", "Pipeline", "PipelineOutput"]
110 changes: 77 additions & 33 deletions solidago/src/solidago/pipeline/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,58 @@ def get_individual_scores(
) -> Optional[pd.DataFrame]:
raise NotImplementedError

@abstractmethod
def get_vouches(self):
"""Fetch data about vouches shared between users

Returns:
- DataFrame with columns
* `voucher`: int, user_id of the user who gives the vouch
* `vouchee`: int, user_id of the user who receives the vouch
* `vouch`: float, value of this vouch
"""
raise NotImplementedError

def get_users(self):
users = self.ratings_properties.groupby("user_id").first()[["trust_score"]]
users["is_pretrusted"] = users["trust_score"] >= 0.8
return users

def get_pipeline_kwargs(self, criterion: str):
ratings_properties = self.ratings_properties
users = self.get_users()
vouches = self.get_vouches()
comparisons = self.get_comparisons(criteria=criterion)
entities_ids = set(comparisons["entity_a"].unique()) | set(
comparisons["entity_b"].unique()
)
entities = pd.DataFrame(index=list(entities_ids))

privacy = PrivacySettings()
user_entity_pairs = set(
comparisons[["user_id", "entity_a"]].itertuples(index=False, name=None)
).union(comparisons[["user_id", "entity_b"]].itertuples(index=False, name=None))
for rating in ratings_properties.itertuples():
if (rating.user_id, rating.entity_id) in user_entity_pairs:
privacy[(rating.user_id, rating.entity_id)] = not rating.is_public

judgments = DataFrameJudgments(
comparisons=comparisons.rename(
columns={
"score": "comparison",
"score_max": "comparison_max",
}
)
)

return {
"users": users,
"vouches": vouches,
"entities": entities,
"privacy": privacy,
"judgments": judgments,
}


class TournesolInputFromPublicDataset(TournesolInput):
def __init__(self, dataset_zip: Union[str, BinaryIO]):
Expand All @@ -72,14 +124,18 @@ def __init__(self, dataset_zip: Union[str, BinaryIO]):
self.comparisons = pd.read_csv(comparison_file, keep_default_na=False)
self.entity_id_to_video_id = pd.Series(
list(set(self.comparisons.video_a) | set(self.comparisons.video_b)),
name="video_id"
name="video_id",
)
video_id_to_entity_id = {
video_id: entity_id
for (entity_id, video_id) in self.entity_id_to_video_id.items()
}
self.comparisons["entity_a"] = self.comparisons["video_a"].map(video_id_to_entity_id)
self.comparisons["entity_b"] = self.comparisons["video_b"].map(video_id_to_entity_id)
self.comparisons["entity_a"] = self.comparisons["video_a"].map(
video_id_to_entity_id
)
self.comparisons["entity_b"] = self.comparisons["video_b"].map(
video_id_to_entity_id
)
self.comparisons.drop(columns=["video_a", "video_b"], inplace=True)

with (zipfile.Path(zip_file) / "users.csv").open(mode="rb") as users_file:
Expand All @@ -90,26 +146,25 @@ def __init__(self, dataset_zip: Union[str, BinaryIO]):
# Fill trust_score on newly created users for which it was not computed yet
self.users.trust_score = pd.to_numeric(self.users.trust_score).fillna(0.0)

username_to_user_id = pd.Series(
self.username_to_user_id = pd.Series(
data=self.users.index, index=self.users["public_username"]
)
self.comparisons = self.comparisons.join(username_to_user_id, on="public_username")
self.comparisons = self.comparisons.join(self.username_to_user_id, on="public_username")

with (zipfile.Path(zip_file) / "vouchers.csv").open(mode="rb") as vouchers_file:
# keep_default_na=False is required otherwise some public usernames
# such as "NA" are converted to float NaN.
self.vouchers = pd.read_csv(vouchers_file, keep_default_na=False)

with (zipfile.Path(zip_file) / "collective_criteria_scores.csv").open(mode="rb") as collective_scores_file:
# keep_default_na=False is required otherwise some public usernames
# such as "NA" are converted to float NaN.
self.collective_scores = pd.read_csv(collective_scores_file, keep_default_na=False)

with (zipfile.Path(zip_file) / "individual_criteria_scores.csv").open(mode="rb") as individual_scores_file:
# keep_default_na=False is required otherwise some public usernames
# such as "NA" are converted to float NaN.
self.individual_scores = pd.read_csv(individual_scores_file, keep_default_na=False)


@classmethod
def download(cls) -> "TournesolInputFromPublicDataset":
Expand Down Expand Up @@ -153,27 +208,16 @@ def get_individual_scores(
) -> Optional[pd.DataFrame]:
# TODO: read contributor scores from individual_scores.csv
return None

def get_pipeline_objects(self):
users = self.users
users = users.assign(is_pretrusted=(users["trust_score"] >= 0.8))
vouches = pd.DataFrame(columns=["voucher", "vouchee", "vouch"])
entities_indices = set(self.comparisons["entity_a"]) | set(self.comparisons["entity_b"])
entities = pd.DataFrame(index=list(entities_indices))
entities.index.name = "entity_id"
privacy = PrivacySettings()
for (user_id, entity_id) in set(
self.comparisons[["user_id", "entity_a"]].itertuples(index=False, name=None)
).union(
self.comparisons[["user_id", "entity_b"]].itertuples(index=False, name=None)
):
privacy[user_id, entity_id] = False
return users, vouches, entities, privacy

def get_judgments(self, criterion):
comparisons = self.comparisons
if criterion is not None:
comparisons = comparisons[comparisons["criteria"] == criterion]
comparisons = comparisons.rename(columns={"score": "comparison"})
comparisons = comparisons.assign(comparison_max=[10] * len(comparisons))
return DataFrameJudgments(comparisons=comparisons)

def get_vouches(self):
vouchers = self.vouchers[
self.vouchers.by_username.isin(self.username_to_user_id.index)
& self.vouchers.to_username.isin(self.username_to_user_id.index)
]
return pd.DataFrame(
{
"voucher": vouchers.by_username.map(self.username_to_user_id),
"vouchee": vouchers.to_username.map(self.username_to_user_id),
"vouch": vouchers.value,
}
)
26 changes: 20 additions & 6 deletions solidago/src/solidago/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from solidago.aggregation import Aggregation, StandardizedQrMedian, StandardizedQrQuantile, Average, EntitywiseQrQuantile
from solidago.post_process import PostProcess, Squash, NoPostProcess

from solidago.pipeline.inputs import TournesolInput
from solidago.pipeline.outputs import PipelineOutput

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,7 +83,7 @@ def __init__(
aggregation: Aggregation = DefaultPipeline.aggregation,
post_process: PostProcess = DefaultPipeline.post_process,
):
""" Instantiates the pipeline components.
"""Instantiates the pipeline components.

Parameters
----------
Expand Down Expand Up @@ -118,7 +119,22 @@ def from_json(cls, json) -> "Pipeline":
aggregation=aggregation_from_json(json["aggregation"]),
post_process=post_process_from_json(json["post_process"]),
)


def run(
self,
input: TournesolInput,
criterion: str,
output: Optional[PipelineOutput] = None
):
# TODO: criterion should be managed by TournesolInput

# TODO: read existing individual scores from input
# to pass `init_user_models`
return self(
**input.get_pipeline_kwargs(criterion),
output=output,
)

def __call__(
self,
users: pd.DataFrame,
Expand Down Expand Up @@ -148,8 +164,6 @@ def __call__(
judgments[user] must yield the judgment data provided by the user
init_user_models: dict[int, ScoringModel]
user_models[user] is the user's model
skip_set: set[int]
Steps that are skipped in the pipeline

Returns
-------
Expand Down Expand Up @@ -229,8 +243,8 @@ def to_json(self):
post_process=self.post_process.to_json()
)

@staticmethod
def save_individual_scalings(
self,
user_models: dict[int, ScaledScoringModel],
output: PipelineOutput,
):
Expand All @@ -251,8 +265,8 @@ def save_individual_scalings(
)
output.save_individual_scalings(scalings_df)

@staticmethod
def save_individual_scores(
self,
user_scorings: dict[int, ScoringModel],
raw_user_scorings: dict[int, ScoringModel],
voting_rights: VotingRights,
Expand Down
5 changes: 4 additions & 1 deletion solidago/src/solidago/preference_learning/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Step 3 of the pipeline.
""" **Step 3 of the pipeline**

Preference learning infers, for each user and based on their data,
a model of the user's preferences.
Expand All @@ -13,3 +13,6 @@
from .lbfgs_generalized_bradley_terry import LBFGSUniformGBT
except RuntimeError:
pass


__all__ = ["PreferenceLearning", "UniformGBT", "LBFGSUniformGBT"]
12 changes: 7 additions & 5 deletions solidago/src/solidago/preference_learning/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __call__(

Parameters
----------
user_judgments: dict[str, pd.DataFrame]
judgments:
May contain different forms of judgments,
but most likely will contain "comparisons" and/or "assessments"
entities: DataFrame with columns
Expand All @@ -35,8 +35,9 @@ def __call__(
initialization: dict[int, ScoringModel] or ScoringModel or None
Starting models, added to facilitate optimization
It is not supposed to affect the output of the training
new_judgments: New judgments
This allows to prioritize coordinate descent, starting with newly evaluated entities
new_judgments:
New judgments
This allows to prioritize coordinate descent, starting with newly evaluated entities

Returns
-------
Expand Down Expand Up @@ -79,8 +80,9 @@ def user_learn(
initialization: ScoringModel or None
Starting model, added to facilitate optimization
It is not supposed to affect the output of the training
new_judgments: New judgments
This allows to prioritize coordinate descent, starting with newly evaluated entities
new_judgments:
New judgments
This allows to prioritize coordinate descent, starting with newly evaluated entities

Returns
-------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,8 @@ def __init__(
):
"""

Parameters
Parameters (TODO)
----------
initialization: dict[int, float]
previously computed entity scores
error: float
tolerated error
"""
super().__init__(prior_std_dev, convergence_error)
self.cumulant_generating_function_error = cumulant_generating_function_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,8 @@ def __init__(
max_iter: int = 100,
):
"""
Parameters
Parameters (TODO)
----------
initialization: dict[int, float]
previously computed entity scores
error: float
tolerated error
"""
super().__init__(prior_std_dev, convergence_error, max_iter=max_iter)
self.cumulant_generating_function_error = cumulant_generating_function_error
Expand Down
4 changes: 3 additions & 1 deletion solidago/src/solidago/trust_propagation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Step 1 of the pipeline.
""" **Step 1 in the pipeline**

Trust propagation is tasked to combine pretrusts and vouches
to derive trust scores for the different users.
Expand All @@ -8,3 +8,5 @@
from .no_trust_propagation import NoTrustPropagation
from .lipschitrust import LipschiTrust
from .trust_all import TrustAll

__all__ = ["TrustPropagation", "NoTrustPropagation", "LipschiTrust", "TrustAll"]
Loading
Loading