Skip to content

Commit

Permalink
Merge pull request #1971 from tournesol-app/solidago-pipeline-docs-1
Browse files Browse the repository at this point in the history
[solidago] Update docstrings and add simple API for `Pipeline`
  • Loading branch information
amatissart committed Jun 3, 2024
2 parents 82e9c4f + afc32d4 commit 0032c86
Show file tree
Hide file tree
Showing 19 changed files with 188 additions and 153 deletions.
12 changes: 12 additions & 0 deletions backend/ml/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ContributorScaling,
Entity,
)
from vouch.models import Voucher


class MlInputFromDb(TournesolInput):
Expand Down Expand Up @@ -189,3 +190,14 @@ def get_individual_scores(

dtf = pd.DataFrame(values)
return dtf[["user_id", "entity", "criteria", "raw_score"]]

def get_vouches(self):
values = Voucher.objects.filter(
by__is_active=True,
to__is_active=True,
).values(
voucher="by__id",
vouchee="to__id",
vouch="value",
)
return pd.DataFrame(values)
10 changes: 6 additions & 4 deletions backend/tournesol/lib/public_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def write_comparisons_file(
"criteria",
"score",
"score_max",
"week_date"
"week_date",
]
writer = csv.DictWriter(write_target, fieldnames=fieldnames)
writer.writeheader()
Expand Down Expand Up @@ -413,7 +413,9 @@ def write_vouchers_file(write_target):
"to_username": voucher.to.username,
"value": voucher.value,
}
for voucher in Voucher.objects.filter(is_public=True)
.select_related("by", "to")
.order_by("by__username", "to__username")
for voucher in (
Voucher.objects.filter(is_public=True, by__is_active=True, to__is_active=True)
.select_related("by", "to")
.order_by("by__username", "to__username")
)
)
15 changes: 9 additions & 6 deletions solidago/experiments/tournesol.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
for entity_id, video_id in enumerate(inputs.entity_id_to_video_id)
}

logger.info("Preprocessing data for the pipeline")
users, vouches, all_entities, privacy = inputs.get_pipeline_objects()

# criteria = set(inputs.comparisons["criteria"])
criteria = { "largely_recommended" }
Expand Down Expand Up @@ -89,13 +87,18 @@

user_outputs, entities, voting_rights, scaled_user_models = dict(), dict(), dict(), dict()

users = pipeline.trust_propagation(users, vouches)

for c in criteria:
logger.info(f"Running the pipeline for criterion `{c}`")

judgments = inputs.get_judgments(c)


pipeline_objects = inputs.get_pipeline_kwargs(criterion=c)
users = pipeline_objects["users"]
vouches = pipeline_objects["vouches"]
all_entities = pipeline_objects["entities"]
privacy = pipeline_objects["privacy"]
judgments = pipeline_objects["judgments"]

users = pipeline.trust_propagation(users, vouches)
voting_rights[c], entities[c] = pipeline.voting_rights(users, all_entities, vouches, privacy)
user_models = pipeline.preference_learning(judgments, users, entities[c])
scaled_user_models[c] = pipeline.scaling(user_models, users, entities[c], voting_rights[c], privacy)
Expand Down
2 changes: 2 additions & 0 deletions solidago/src/solidago/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .inputs import TournesolInput
from .outputs import PipelineOutput
from .pipeline import DefaultPipeline, Pipeline

__all__ = ["TournesolInput", "DefaultPipeline", "Pipeline", "PipelineOutput"]
110 changes: 77 additions & 33 deletions solidago/src/solidago/pipeline/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,58 @@ def get_individual_scores(
) -> Optional[pd.DataFrame]:
raise NotImplementedError

@abstractmethod
def get_vouches(self):
"""Fetch data about vouches shared between users
Returns:
- DataFrame with columns
* `voucher`: int, user_id of the user who gives the vouch
* `vouchee`: int, user_id of the user who receives the vouch
* `vouch`: float, value of this vouch
"""
raise NotImplementedError

def get_users(self):
users = self.ratings_properties.groupby("user_id").first()[["trust_score"]]
users["is_pretrusted"] = users["trust_score"] >= 0.8
return users

def get_pipeline_kwargs(self, criterion: str):
ratings_properties = self.ratings_properties
users = self.get_users()
vouches = self.get_vouches()
comparisons = self.get_comparisons(criteria=criterion)
entities_ids = set(comparisons["entity_a"].unique()) | set(
comparisons["entity_b"].unique()
)
entities = pd.DataFrame(index=list(entities_ids))

privacy = PrivacySettings()
user_entity_pairs = set(
comparisons[["user_id", "entity_a"]].itertuples(index=False, name=None)
).union(comparisons[["user_id", "entity_b"]].itertuples(index=False, name=None))
for rating in ratings_properties.itertuples():
if (rating.user_id, rating.entity_id) in user_entity_pairs:
privacy[(rating.user_id, rating.entity_id)] = not rating.is_public

judgments = DataFrameJudgments(
comparisons=comparisons.rename(
columns={
"score": "comparison",
"score_max": "comparison_max",
}
)
)

return {
"users": users,
"vouches": vouches,
"entities": entities,
"privacy": privacy,
"judgments": judgments,
}


class TournesolInputFromPublicDataset(TournesolInput):
def __init__(self, dataset_zip: Union[str, BinaryIO]):
Expand All @@ -72,14 +124,18 @@ def __init__(self, dataset_zip: Union[str, BinaryIO]):
self.comparisons = pd.read_csv(comparison_file, keep_default_na=False)
self.entity_id_to_video_id = pd.Series(
list(set(self.comparisons.video_a) | set(self.comparisons.video_b)),
name="video_id"
name="video_id",
)
video_id_to_entity_id = {
video_id: entity_id
for (entity_id, video_id) in self.entity_id_to_video_id.items()
}
self.comparisons["entity_a"] = self.comparisons["video_a"].map(video_id_to_entity_id)
self.comparisons["entity_b"] = self.comparisons["video_b"].map(video_id_to_entity_id)
self.comparisons["entity_a"] = self.comparisons["video_a"].map(
video_id_to_entity_id
)
self.comparisons["entity_b"] = self.comparisons["video_b"].map(
video_id_to_entity_id
)
self.comparisons.drop(columns=["video_a", "video_b"], inplace=True)

with (zipfile.Path(zip_file) / "users.csv").open(mode="rb") as users_file:
Expand All @@ -90,26 +146,25 @@ def __init__(self, dataset_zip: Union[str, BinaryIO]):
# Fill trust_score on newly created users for which it was not computed yet
self.users.trust_score = pd.to_numeric(self.users.trust_score).fillna(0.0)

username_to_user_id = pd.Series(
self.username_to_user_id = pd.Series(
data=self.users.index, index=self.users["public_username"]
)
self.comparisons = self.comparisons.join(username_to_user_id, on="public_username")
self.comparisons = self.comparisons.join(self.username_to_user_id, on="public_username")

with (zipfile.Path(zip_file) / "vouchers.csv").open(mode="rb") as vouchers_file:
# keep_default_na=False is required otherwise some public usernames
# such as "NA" are converted to float NaN.
self.vouchers = pd.read_csv(vouchers_file, keep_default_na=False)

with (zipfile.Path(zip_file) / "collective_criteria_scores.csv").open(mode="rb") as collective_scores_file:
# keep_default_na=False is required otherwise some public usernames
# such as "NA" are converted to float NaN.
self.collective_scores = pd.read_csv(collective_scores_file, keep_default_na=False)

with (zipfile.Path(zip_file) / "individual_criteria_scores.csv").open(mode="rb") as individual_scores_file:
# keep_default_na=False is required otherwise some public usernames
# such as "NA" are converted to float NaN.
self.individual_scores = pd.read_csv(individual_scores_file, keep_default_na=False)


@classmethod
def download(cls) -> "TournesolInputFromPublicDataset":
Expand Down Expand Up @@ -153,27 +208,16 @@ def get_individual_scores(
) -> Optional[pd.DataFrame]:
# TODO: read contributor scores from individual_scores.csv
return None

def get_pipeline_objects(self):
users = self.users
users = users.assign(is_pretrusted=(users["trust_score"] >= 0.8))
vouches = pd.DataFrame(columns=["voucher", "vouchee", "vouch"])
entities_indices = set(self.comparisons["entity_a"]) | set(self.comparisons["entity_b"])
entities = pd.DataFrame(index=list(entities_indices))
entities.index.name = "entity_id"
privacy = PrivacySettings()
for (user_id, entity_id) in set(
self.comparisons[["user_id", "entity_a"]].itertuples(index=False, name=None)
).union(
self.comparisons[["user_id", "entity_b"]].itertuples(index=False, name=None)
):
privacy[user_id, entity_id] = False
return users, vouches, entities, privacy

def get_judgments(self, criterion):
comparisons = self.comparisons
if criterion is not None:
comparisons = comparisons[comparisons["criteria"] == criterion]
comparisons = comparisons.rename(columns={"score": "comparison"})
comparisons = comparisons.assign(comparison_max=[10] * len(comparisons))
return DataFrameJudgments(comparisons=comparisons)

def get_vouches(self):
vouchers = self.vouchers[
self.vouchers.by_username.isin(self.username_to_user_id.index)
& self.vouchers.to_username.isin(self.username_to_user_id.index)
]
return pd.DataFrame(
{
"voucher": vouchers.by_username.map(self.username_to_user_id),
"vouchee": vouchers.to_username.map(self.username_to_user_id),
"vouch": vouchers.value,
}
)
26 changes: 20 additions & 6 deletions solidago/src/solidago/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from solidago.aggregation import Aggregation, StandardizedQrMedian, StandardizedQrQuantile, Average, EntitywiseQrQuantile
from solidago.post_process import PostProcess, Squash, NoPostProcess

from solidago.pipeline.inputs import TournesolInput
from solidago.pipeline.outputs import PipelineOutput

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,7 +83,7 @@ def __init__(
aggregation: Aggregation = DefaultPipeline.aggregation,
post_process: PostProcess = DefaultPipeline.post_process,
):
""" Instantiates the pipeline components.
"""Instantiates the pipeline components.
Parameters
----------
Expand Down Expand Up @@ -118,7 +119,22 @@ def from_json(cls, json) -> "Pipeline":
aggregation=aggregation_from_json(json["aggregation"]),
post_process=post_process_from_json(json["post_process"]),
)


def run(
self,
input: TournesolInput,
criterion: str,
output: Optional[PipelineOutput] = None
):
# TODO: criterion should be managed by TournesolInput

# TODO: read existing individual scores from input
# to pass `init_user_models`
return self(
**input.get_pipeline_kwargs(criterion),
output=output,
)

def __call__(
self,
users: pd.DataFrame,
Expand Down Expand Up @@ -148,8 +164,6 @@ def __call__(
judgments[user] must yield the judgment data provided by the user
init_user_models: dict[int, ScoringModel]
user_models[user] is the user's model
skip_set: set[int]
Steps that are skipped in the pipeline
Returns
-------
Expand Down Expand Up @@ -229,8 +243,8 @@ def to_json(self):
post_process=self.post_process.to_json()
)

@staticmethod
def save_individual_scalings(
self,
user_models: dict[int, ScaledScoringModel],
output: PipelineOutput,
):
Expand All @@ -251,8 +265,8 @@ def save_individual_scalings(
)
output.save_individual_scalings(scalings_df)

@staticmethod
def save_individual_scores(
self,
user_scorings: dict[int, ScoringModel],
raw_user_scorings: dict[int, ScoringModel],
voting_rights: VotingRights,
Expand Down
5 changes: 4 additions & 1 deletion solidago/src/solidago/preference_learning/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Step 3 of the pipeline.
""" **Step 3 of the pipeline**
Preference learning infers, for each user and based on their data,
a model of the user's preferences.
Expand All @@ -13,3 +13,6 @@
from .lbfgs_generalized_bradley_terry import LBFGSUniformGBT
except RuntimeError:
pass


__all__ = ["PreferenceLearning", "UniformGBT", "LBFGSUniformGBT"]
12 changes: 7 additions & 5 deletions solidago/src/solidago/preference_learning/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __call__(
Parameters
----------
user_judgments: dict[str, pd.DataFrame]
judgments:
May contain different forms of judgments,
but most likely will contain "comparisons" and/or "assessments"
entities: DataFrame with columns
Expand All @@ -35,8 +35,9 @@ def __call__(
initialization: dict[int, ScoringModel] or ScoringModel or None
Starting models, added to facilitate optimization
It is not supposed to affect the output of the training
new_judgments: New judgments
This allows to prioritize coordinate descent, starting with newly evaluated entities
new_judgments:
New judgments
This allows to prioritize coordinate descent, starting with newly evaluated entities
Returns
-------
Expand Down Expand Up @@ -79,8 +80,9 @@ def user_learn(
initialization: ScoringModel or None
Starting model, added to facilitate optimization
It is not supposed to affect the output of the training
new_judgments: New judgments
This allows to prioritize coordinate descent, starting with newly evaluated entities
new_judgments:
New judgments
This allows to prioritize coordinate descent, starting with newly evaluated entities
Returns
-------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,8 @@ def __init__(
):
"""
Parameters
Parameters (TODO)
----------
initialization: dict[int, float]
previously computed entity scores
error: float
tolerated error
"""
super().__init__(prior_std_dev, convergence_error)
self.cumulant_generating_function_error = cumulant_generating_function_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,8 @@ def __init__(
max_iter: int = 100,
):
"""
Parameters
Parameters (TODO)
----------
initialization: dict[int, float]
previously computed entity scores
error: float
tolerated error
"""
super().__init__(prior_std_dev, convergence_error, max_iter=max_iter)
self.cumulant_generating_function_error = cumulant_generating_function_error
Expand Down
4 changes: 3 additions & 1 deletion solidago/src/solidago/trust_propagation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Step 1 of the pipeline.
""" **Step 1 in the pipeline**
Trust propagation is tasked to combine pretrusts and vouches
to derive trust scores for the different users.
Expand All @@ -8,3 +8,5 @@
from .no_trust_propagation import NoTrustPropagation
from .lipschitrust import LipschiTrust
from .trust_all import TrustAll

__all__ = ["TrustPropagation", "NoTrustPropagation", "LipschiTrust", "TrustAll"]
Loading

0 comments on commit 0032c86

Please sign in to comment.