From 2bc1d01f275487835d15973bc9915f212512be4d Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Mon, 5 Jun 2023 12:07:47 -0700 Subject: [PATCH 01/30] Make preprocessing modifications V1 --- ludwig/data/preprocessing.py | 47 ++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 6cac5f23f77..e59437e0297 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1205,6 +1205,53 @@ def build_dataset( logger.debug(f"sample {sample_ratio} of data") dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed) + # If training a reward model, perform grouping and joining on dataset + if "reward" in global_preprocessing_parameters: + reward_parameter_names = [ + "id_column", + "outcome_column", + "chosen_value", + "rejected_value", + ] + if not all( + param_name in global_preprocessing_parameters["reward"] for param_name in reward_parameter_names + ): + raise ValueError( + "Invalid reward training preprocessing parameters, expect " f"{reward_parameter_names}." + ) + + # Todo: add validation to input dataframe + + # Obtain column names and other values + id_column = global_preprocessing_parameters["reward"]["id_column"] + outcome_column = global_preprocessing_parameters["reward"]["outcome_column"] + chosen_value = global_preprocessing_parameters["reward"]["chosen_value"] + rejected_value = global_preprocessing_parameters["reward"]["rejected_value"] + transcript_column = config["input_features"]["name"] + + # Initialize the new refactored dataframe + dataset_df_refactored = dataset_df[0:0] + for column_name in dataset_df_refactored: + dataset_df_refactored.drop(column_name) + dataset_df_refactored[chosen_value] = [] + dataset_df_refactored[rejected_value] = [] + + # Group original dataframe by ID, add group data + dataset_df_groups = dataset_df.groupby(id_column) + refactored_rows = { + chosen_value: [], + rejected_value: []} + for i, group_id in enumerate(dataset_df_groups.groups): + group_df = dataset_df_groups.get_group(group_id) + chosen_transcript = group_df.loc[group_df[outcome_column] == chosen_value][ + transcript_column][0] + rejected_transcript = group_df.loc[group_df[outcome_column] == rejected_value][ + transcript_column][0] + refactored_rows[chosen_value].append(chosen_transcript) + refactored_rows[rejected_value].append(rejected_transcript) + dataset_df_refactored.append(refactored_rows) + dataset_df = dataset_df_refactored + # If persisting DataFrames in memory is enabled, we want to do this after # each batch of parallel ops in order to avoid redundant computation dataset_df = df_engine.persist(dataset_df) From d6cc33189be06d7259dee7431889267867fc2c98 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Mon, 5 Jun 2023 12:30:21 -0700 Subject: [PATCH 02/30] Add dataset validation --- ludwig/data/preprocessing.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index e59437e0297..54e770df9cb 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1216,11 +1216,7 @@ def build_dataset( if not all( param_name in global_preprocessing_parameters["reward"] for param_name in reward_parameter_names ): - raise ValueError( - "Invalid reward training preprocessing parameters, expect " f"{reward_parameter_names}." - ) - - # Todo: add validation to input dataframe + raise ValueError(f"Invalid reward training preprocessing parameters, expect {reward_parameter_names}.") # Obtain column names and other values id_column = global_preprocessing_parameters["reward"]["id_column"] @@ -1229,6 +1225,11 @@ def build_dataset( rejected_value = global_preprocessing_parameters["reward"]["rejected_value"] transcript_column = config["input_features"]["name"] + # Validate the input dataframe's columns + dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) + if not sorted(dataset_df.columns) == dataset_columns_expected: + raise ValueError(f"Invalid reward training input dataset, expect columns {dataset_columns_expected}.") + # Initialize the new refactored dataframe dataset_df_refactored = dataset_df[0:0] for column_name in dataset_df_refactored: @@ -1238,15 +1239,11 @@ def build_dataset( # Group original dataframe by ID, add group data dataset_df_groups = dataset_df.groupby(id_column) - refactored_rows = { - chosen_value: [], - rejected_value: []} - for i, group_id in enumerate(dataset_df_groups.groups): + refactored_rows = {chosen_value: [], rejected_value: []} + for group_id in dataset_df_groups.groups: group_df = dataset_df_groups.get_group(group_id) - chosen_transcript = group_df.loc[group_df[outcome_column] == chosen_value][ - transcript_column][0] - rejected_transcript = group_df.loc[group_df[outcome_column] == rejected_value][ - transcript_column][0] + chosen_transcript = group_df.loc[group_df[outcome_column] == chosen_value][transcript_column][0] + rejected_transcript = group_df.loc[group_df[outcome_column] == rejected_value][transcript_column][0] refactored_rows[chosen_value].append(chosen_transcript) refactored_rows[rejected_value].append(rejected_transcript) dataset_df_refactored.append(refactored_rows) From ea07940c74eaf0354db3aa1b680179f7fc7c8a9e Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Mon, 5 Jun 2023 12:33:48 -0700 Subject: [PATCH 03/30] Small edit --- ludwig/data/preprocessing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 54e770df9cb..29e65ea1347 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1227,7 +1227,7 @@ def build_dataset( # Validate the input dataframe's columns dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) - if not sorted(dataset_df.columns) == dataset_columns_expected: + if not all(sorted(dataset_df.columns) == dataset_columns_expected): raise ValueError(f"Invalid reward training input dataset, expect columns {dataset_columns_expected}.") # Initialize the new refactored dataframe From e993d7b0e0031c0933a8607bd56084dfef6ecce4 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 7 Jun 2023 15:44:33 -0700 Subject: [PATCH 04/30] Add tests --- ludwig/data/preprocessing.py | 88 +++++++++---------- tests/integration_tests/test_preprocessing.py | 37 ++++++++ 2 files changed, 81 insertions(+), 44 deletions(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 29e65ea1347..cffd6d3bf6f 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1205,50 +1205,6 @@ def build_dataset( logger.debug(f"sample {sample_ratio} of data") dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed) - # If training a reward model, perform grouping and joining on dataset - if "reward" in global_preprocessing_parameters: - reward_parameter_names = [ - "id_column", - "outcome_column", - "chosen_value", - "rejected_value", - ] - if not all( - param_name in global_preprocessing_parameters["reward"] for param_name in reward_parameter_names - ): - raise ValueError(f"Invalid reward training preprocessing parameters, expect {reward_parameter_names}.") - - # Obtain column names and other values - id_column = global_preprocessing_parameters["reward"]["id_column"] - outcome_column = global_preprocessing_parameters["reward"]["outcome_column"] - chosen_value = global_preprocessing_parameters["reward"]["chosen_value"] - rejected_value = global_preprocessing_parameters["reward"]["rejected_value"] - transcript_column = config["input_features"]["name"] - - # Validate the input dataframe's columns - dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) - if not all(sorted(dataset_df.columns) == dataset_columns_expected): - raise ValueError(f"Invalid reward training input dataset, expect columns {dataset_columns_expected}.") - - # Initialize the new refactored dataframe - dataset_df_refactored = dataset_df[0:0] - for column_name in dataset_df_refactored: - dataset_df_refactored.drop(column_name) - dataset_df_refactored[chosen_value] = [] - dataset_df_refactored[rejected_value] = [] - - # Group original dataframe by ID, add group data - dataset_df_groups = dataset_df.groupby(id_column) - refactored_rows = {chosen_value: [], rejected_value: []} - for group_id in dataset_df_groups.groups: - group_df = dataset_df_groups.get_group(group_id) - chosen_transcript = group_df.loc[group_df[outcome_column] == chosen_value][transcript_column][0] - rejected_transcript = group_df.loc[group_df[outcome_column] == rejected_value][transcript_column][0] - refactored_rows[chosen_value].append(chosen_transcript) - refactored_rows[rejected_value].append(rejected_transcript) - dataset_df_refactored.append(refactored_rows) - dataset_df = dataset_df_refactored - # If persisting DataFrames in memory is enabled, we want to do this after # each batch of parallel ops in order to avoid redundant computation dataset_df = df_engine.persist(dataset_df) @@ -1400,6 +1356,50 @@ def build_dataset( # Embed features with fixed encoders dataset = embed_fixed_features(dataset, feature_configs, metadata, backend) + # If training a reward model, perform grouping and joining on dataset + if mode == "training" and "reward" in global_preprocessing_parameters: + reward_parameter_names = [ + "id_column", + "outcome_column", + "chosen_value", + "rejected_value", + ] + if not all( + param_name in global_preprocessing_parameters["reward"] for param_name in reward_parameter_names + ): + raise ValueError(f"Invalid reward training preprocessing parameters, expect {reward_parameter_names}.") + + # Obtain column names and other values + id_column = global_preprocessing_parameters["reward"]["id_column"] + outcome_column = global_preprocessing_parameters["reward"]["outcome_column"] + chosen_value = global_preprocessing_parameters["reward"]["chosen_value"] + rejected_value = global_preprocessing_parameters["reward"]["rejected_value"] + transcript_column = config["input_features"]["name"] + + # Validate the input dataframe's columns + dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) + if not all(sorted(dataset.columns) == dataset_columns_expected): + raise ValueError(f"Invalid reward training input dataset, expect columns {dataset_columns_expected}.") + + # Initialize the new refactored dataframe + dataset_refactored = dataset[0:0] + for column_name in dataset_refactored: + dataset_refactored.drop(column_name) + dataset_refactored[chosen_value] = [] + dataset_refactored[rejected_value] = [] + + # Group original dataframe by ID, add group data + dataset_groups = dataset.groupby(id_column) + refactored_rows = {chosen_value: [], rejected_value: []} + for group_id in dataset_groups.groups: + group_df = dataset_groups.get_group(group_id) + chosen_transcript = group_df.loc[group_df[outcome_column] == chosen_value][transcript_column][0] + rejected_transcript = group_df.loc[group_df[outcome_column] == rejected_value][transcript_column][0] + refactored_rows[chosen_value].append(chosen_transcript) + refactored_rows[rejected_value].append(rejected_transcript) + dataset_refactored.append(refactored_rows) + dataset = dataset_refactored + return dataset, metadata diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 1bf0610443e..17ce64560ba 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -163,6 +163,43 @@ def test_strip_whitespace_category(csv_filename, tmpdir): assert len(np.unique(train_ds.dataset[cat_feat[PROC_COLUMN]])) == cat_feat[DECODER]["vocab_size"] +def test_reward_model_dataset_refactor(): + input_features = [text_feature()] + output_features = [number_feature(), category_feature(decoder={"vocab_size": 2})] + backend = LocalTestBackend() + config = {"input_features": input_features, "output_features": output_features} + + # Generate random dataframe + dataframe = generate_data_as_dataframe(input_features, output_features, num_examples=20) + + # Add reward model training pairs + id_column, outcome_column = "", "" + for column_name in dataframe.columns: + if "number" in column_name: + id_column = column_name + elif "category" in column_name: + outcome_column = column_name + dataframe[id_column] = dataframe.index // 2 + dataframe[outcome_column] = np.where(dataframe.index % 2, "rejected", "chosen") + + # Modify config with preprocessing + config["preprocessing"] = { + "reward": { + "id_column": id_column, + "outcome_column": outcome_column, + "chosen_value": "chosen", + "rejected_value": "rejected", + } + } + + # Run preprocessing, get output dataset + ludwig_model = LudwigModel(config, backend=backend) + train_dataset, _, _, metadata = ludwig_model.preprocess(dataset=dataframe) + + # expect values containing whitespaces to be properly mapped to vocab_size unique values + assert len(np.unique(train_ds.dataset[cat_feat[PROC_COLUMN]])) == cat_feat[DECODER]["vocab_size"] + + @pytest.mark.parametrize( "backend", [ From 98db29be720e618afcb241f65b32d602c075cce0 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 7 Jun 2023 17:00:10 -0700 Subject: [PATCH 05/30] Small edits --- ludwig/data/preprocessing.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index cffd6d3bf6f..1ebd2695cba 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1364,9 +1364,7 @@ def build_dataset( "chosen_value", "rejected_value", ] - if not all( - param_name in global_preprocessing_parameters["reward"] for param_name in reward_parameter_names - ): + if not all(param_name in global_preprocessing_parameters["reward"] for param_name in reward_parameter_names): raise ValueError(f"Invalid reward training preprocessing parameters, expect {reward_parameter_names}.") # Obtain column names and other values @@ -1378,26 +1376,25 @@ def build_dataset( # Validate the input dataframe's columns dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) - if not all(sorted(dataset.columns) == dataset_columns_expected): + if not sorted(dataset.columns) == dataset_columns_expected: raise ValueError(f"Invalid reward training input dataset, expect columns {dataset_columns_expected}.") # Initialize the new refactored dataframe dataset_refactored = dataset[0:0] - for column_name in dataset_refactored: - dataset_refactored.drop(column_name) dataset_refactored[chosen_value] = [] dataset_refactored[rejected_value] = [] + dataset_refactored.drop([id_column, outcome_column, transcript_column], axis=1, inplace=True) # Group original dataframe by ID, add group data dataset_groups = dataset.groupby(id_column) - refactored_rows = {chosen_value: [], rejected_value: []} for group_id in dataset_groups.groups: group_df = dataset_groups.get_group(group_id) - chosen_transcript = group_df.loc[group_df[outcome_column] == chosen_value][transcript_column][0] - rejected_transcript = group_df.loc[group_df[outcome_column] == rejected_value][transcript_column][0] - refactored_rows[chosen_value].append(chosen_transcript) - refactored_rows[rejected_value].append(rejected_transcript) - dataset_refactored.append(refactored_rows) + chosen_transcript = group_df.loc[group_df[outcome_column] == chosen_value][transcript_column].iloc[0] + rejected_transcript = group_df.loc[group_df[outcome_column] == rejected_value][transcript_column].iloc[0] + dataset_refactored.loc[len(dataset_refactored.index)] = { + chosen_value: chosen_transcript, + rejected_value: rejected_transcript, + } dataset = dataset_refactored return dataset, metadata From 518feca96c3233b89b2fff0a026fc784125c1e9f Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 7 Jun 2023 17:02:54 -0700 Subject: [PATCH 06/30] Another small edit --- tests/integration_tests/test_preprocessing.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 17ce64560ba..18d3ff25f2f 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -196,8 +196,7 @@ def test_reward_model_dataset_refactor(): ludwig_model = LudwigModel(config, backend=backend) train_dataset, _, _, metadata = ludwig_model.preprocess(dataset=dataframe) - # expect values containing whitespaces to be properly mapped to vocab_size unique values - assert len(np.unique(train_ds.dataset[cat_feat[PROC_COLUMN]])) == cat_feat[DECODER]["vocab_size"] + # Todo: validate the output training dataset @pytest.mark.parametrize( From 8ac5101253846e832687b3101ee1fce4451be356 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Sun, 11 Jun 2023 20:04:03 -0700 Subject: [PATCH 07/30] Modify processing strategy --- ludwig/data/preprocessing.py | 101 +++++++++++------- .../metadata/configs/preprocessing.yaml | 5 + ludwig/schema/preprocessing.py | 8 ++ tests/integration_tests/test_preprocessing.py | 32 +++++- 4 files changed, 104 insertions(+), 42 deletions(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 1ebd2695cba..fe6996414c1 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1322,6 +1322,53 @@ def build_dataset( if reshape is not None: proc_cols[proc_column] = backend.df_engine.map_objects(proc_cols[proc_column], lambda x: x.reshape(-1)) + # If training a reward model, prepare the processed columns + if mode == "training" and "reward_dataset" in global_preprocessing_parameters: + reward_parameter_names = [ + "id_column", + "outcome_column", + "chosen_value", + "rejected_value", + ] + if not all( + param_name in global_preprocessing_parameters["reward_dataset"] for param_name in reward_parameter_names + ): + raise ValueError(f"Invalid reward training preprocessing parameters, expect {reward_parameter_names}.") + + # Obtain column names and other values + id_column = global_preprocessing_parameters["reward_dataset"]["id_column"] + outcome_column = global_preprocessing_parameters["reward_dataset"]["outcome_column"] + chosen_value = global_preprocessing_parameters["reward_dataset"]["chosen_value"] + rejected_value = global_preprocessing_parameters["reward_dataset"]["rejected_value"] + transcript_column = config["input_features"][0]["name"] + + # Validate the input dataframe's columns + dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) + if not sorted(dataset_df.columns) == dataset_columns_expected: + raise ValueError( + f"Invalid reward training input dataset, expect columns {dataset_columns_expected}, " + f"got columns {sorted(dataset_df.columns)}." + ) + + # Validate the processed dataset columns + processed_column_names = ["_".join(column_name.split("_")[:2]) for column_name in sorted(proc_cols.keys())] + if not processed_column_names == dataset_columns_expected: + raise ValueError( + f"Invalid reward training processed dataset, expect columns {dataset_columns_expected}, " + f"got columns {processed_column_names}." + ) + + # Remove ID and outcome columns from processed + for column_name in proc_cols.keys(): + if id_column in column_name: + proc_cols[column_name] = dataset_df[id_column] + id_column = column_name + elif outcome_column in column_name: + proc_cols[column_name] = dataset_df[outcome_column] + outcome_column = column_name + elif transcript_column in column_name: + transcript_column = column_name + # Implements an outer join of proc_cols dataset = backend.df_engine.df_like(dataset_df, proc_cols) @@ -1357,44 +1404,22 @@ def build_dataset( dataset = embed_fixed_features(dataset, feature_configs, metadata, backend) # If training a reward model, perform grouping and joining on dataset - if mode == "training" and "reward" in global_preprocessing_parameters: - reward_parameter_names = [ - "id_column", - "outcome_column", - "chosen_value", - "rejected_value", - ] - if not all(param_name in global_preprocessing_parameters["reward"] for param_name in reward_parameter_names): - raise ValueError(f"Invalid reward training preprocessing parameters, expect {reward_parameter_names}.") - - # Obtain column names and other values - id_column = global_preprocessing_parameters["reward"]["id_column"] - outcome_column = global_preprocessing_parameters["reward"]["outcome_column"] - chosen_value = global_preprocessing_parameters["reward"]["chosen_value"] - rejected_value = global_preprocessing_parameters["reward"]["rejected_value"] - transcript_column = config["input_features"]["name"] - - # Validate the input dataframe's columns - dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) - if not sorted(dataset.columns) == dataset_columns_expected: - raise ValueError(f"Invalid reward training input dataset, expect columns {dataset_columns_expected}.") - - # Initialize the new refactored dataframe - dataset_refactored = dataset[0:0] - dataset_refactored[chosen_value] = [] - dataset_refactored[rejected_value] = [] - dataset_refactored.drop([id_column, outcome_column, transcript_column], axis=1, inplace=True) - - # Group original dataframe by ID, add group data - dataset_groups = dataset.groupby(id_column) - for group_id in dataset_groups.groups: - group_df = dataset_groups.get_group(group_id) - chosen_transcript = group_df.loc[group_df[outcome_column] == chosen_value][transcript_column].iloc[0] - rejected_transcript = group_df.loc[group_df[outcome_column] == rejected_value][transcript_column].iloc[0] - dataset_refactored.loc[len(dataset_refactored.index)] = { - chosen_value: chosen_transcript, - rejected_value: rejected_transcript, - } + if mode == "training" and "reward_dataset" in global_preprocessing_parameters: + reward_sample_value_map = { + chosen_value: "chosen", + rejected_value: "rejected", + } + + # Group dataset rows by ID, aggregate group data + dataset_id_groups = dataset.groupby(id_column) + dataset_refactored = ( + dataset_id_groups[outcome_column] + .apply(lambda x: [reward_sample_value_map[value] for value in list(x)]) + .reset_index() + ) + dataset_refactored[transcript_column] = ( + dataset_id_groups[transcript_column].apply(list).reset_index()[transcript_column] + ) dataset = dataset_refactored return dataset, metadata diff --git a/ludwig/schema/metadata/configs/preprocessing.yaml b/ludwig/schema/metadata/configs/preprocessing.yaml index 11218989437..9efa16092c2 100644 --- a/ludwig/schema/metadata/configs/preprocessing.yaml +++ b/ludwig/schema/metadata/configs/preprocessing.yaml @@ -146,3 +146,8 @@ cache_encoder_embeddings: it's not always the case that you would always want to enable it when possible. expected_impact: 1 ui_display_name: Cache Encoder Embeddings +reward_dataset: + id_column: The name of the reward model training dataset session ID column + outcome_column: The name of the reward model training dataset chosen/rejected outcome column + chosen_value: The value of the string in the outcome column corresponding to chosen samples + rejected_value: The value of the string in the outcome column corresponding to rejected samples diff --git a/ludwig/schema/preprocessing.py b/ludwig/schema/preprocessing.py index 30127893240..17897aea0ec 100644 --- a/ludwig/schema/preprocessing.py +++ b/ludwig/schema/preprocessing.py @@ -1,3 +1,4 @@ +from typing import Any, Dict from ludwig.api_annotations import DeveloperAPI from ludwig.constants import RANDOM from ludwig.schema import utils as schema_utils @@ -38,6 +39,13 @@ class PreprocessingConfig(schema_utils.BaseMarshmallowConfig): default=RANDOM, ) + reward_dataset: Dict[str, Any] = schema_utils.Dict( + default=None, + allow_none=True, + description="If not None, the input dataset will be preprocessed to train an RLHF reward model.", + parameter_metadata=PREPROCESSING_METADATA["reward_dataset"], + ) + @DeveloperAPI class PreprocessingField(schema_utils.DictMarshmallowField): diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 18d3ff25f2f..de740ab64ca 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -183,12 +183,14 @@ def test_reward_model_dataset_refactor(): dataframe[outcome_column] = np.where(dataframe.index % 2, "rejected", "chosen") # Modify config with preprocessing + chosen_value = "chosen" + rejected_value = "rejected" config["preprocessing"] = { - "reward": { + "reward_dataset": { "id_column": id_column, "outcome_column": outcome_column, - "chosen_value": "chosen", - "rejected_value": "rejected", + "chosen_value": chosen_value, + "rejected_value": rejected_value, } } @@ -196,7 +198,29 @@ def test_reward_model_dataset_refactor(): ludwig_model = LudwigModel(config, backend=backend) train_dataset, _, _, metadata = ludwig_model.preprocess(dataset=dataframe) - # Todo: validate the output training dataset + # Validate the processed dataset columns + dataset = train_dataset.dataset + transcript_column = config["input_features"][0]["name"] + dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) + dataset_columns_actual = ["_".join(column_name.split("_")[:2]) for column_name in sorted(dataset.keys())] + assert dataset_columns_actual == dataset_columns_expected + + # Augment column names to processed versions + for column_name in dataset.keys(): + if id_column in column_name: + id_column = column_name + elif outcome_column in column_name: + outcome_column = column_name + elif transcript_column in column_name: + transcript_column = column_name + + # Validate each row in the processed dataset + for row_id in range(len(dataset[id_column])): + assert len(dataset[outcome_column][row_id]) == 2 + assert len(dataset[transcript_column][row_id]) == 2 + assert dataset[outcome_column][row_id][0] in [chosen_value, rejected_value] + assert dataset[outcome_column][row_id][1] in [chosen_value, rejected_value] + assert dataset[outcome_column][row_id][0] != dataset[outcome_column][row_id][1] @pytest.mark.parametrize( From b61a174df9986a9a679eb74e19ca089217ebcb49 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Sun, 11 Jun 2023 20:05:52 -0700 Subject: [PATCH 08/30] Small edit --- ludwig/data/preprocessing.py | 2 +- ludwig/schema/preprocessing.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index fe6996414c1..32d45634a70 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1358,7 +1358,7 @@ def build_dataset( f"got columns {processed_column_names}." ) - # Remove ID and outcome columns from processed + # Augment column names to processed versions for column_name in proc_cols.keys(): if id_column in column_name: proc_cols[column_name] = dataset_df[id_column] diff --git a/ludwig/schema/preprocessing.py b/ludwig/schema/preprocessing.py index 17897aea0ec..d50c58c2cf4 100644 --- a/ludwig/schema/preprocessing.py +++ b/ludwig/schema/preprocessing.py @@ -1,4 +1,5 @@ from typing import Any, Dict + from ludwig.api_annotations import DeveloperAPI from ludwig.constants import RANDOM from ludwig.schema import utils as schema_utils From e78152d2ff795c38155c103a9fee45ff46db3943 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Sun, 11 Jun 2023 20:28:16 -0700 Subject: [PATCH 09/30] Another small edit --- tests/integration_tests/test_preprocessing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index de740ab64ca..ba2c6960f33 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -180,11 +180,11 @@ def test_reward_model_dataset_refactor(): elif "category" in column_name: outcome_column = column_name dataframe[id_column] = dataframe.index // 2 - dataframe[outcome_column] = np.where(dataframe.index % 2, "rejected", "chosen") - - # Modify config with preprocessing chosen_value = "chosen" rejected_value = "rejected" + dataframe[outcome_column] = np.where(dataframe.index % 2, rejected_value, chosen_value) + + # Modify config with preprocessing config["preprocessing"] = { "reward_dataset": { "id_column": id_column, From 440cbecaeec656b68d9638f49f83d7cf9facced5 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Sun, 11 Jun 2023 20:32:26 -0700 Subject: [PATCH 10/30] Small edit --- tests/integration_tests/test_preprocessing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index ba2c6960f33..91e526c3304 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -180,8 +180,8 @@ def test_reward_model_dataset_refactor(): elif "category" in column_name: outcome_column = column_name dataframe[id_column] = dataframe.index // 2 - chosen_value = "chosen" - rejected_value = "rejected" + chosen_value = "some_value_1" + rejected_value = "some_value_2" dataframe[outcome_column] = np.where(dataframe.index % 2, rejected_value, chosen_value) # Modify config with preprocessing @@ -218,8 +218,8 @@ def test_reward_model_dataset_refactor(): for row_id in range(len(dataset[id_column])): assert len(dataset[outcome_column][row_id]) == 2 assert len(dataset[transcript_column][row_id]) == 2 - assert dataset[outcome_column][row_id][0] in [chosen_value, rejected_value] - assert dataset[outcome_column][row_id][1] in [chosen_value, rejected_value] + assert dataset[outcome_column][row_id][0] in ["chosen", "rejected"] + assert dataset[outcome_column][row_id][1] in ["chosen", "rejected"] assert dataset[outcome_column][row_id][0] != dataset[outcome_column][row_id][1] From b12f8ac43a772fb7a643030ce6bb80055d07f02b Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 7 Jun 2023 17:26:01 -0700 Subject: [PATCH 11/30] Add loss items --- ludwig/constants.py | 1 + ludwig/modules/loss_modules.py | 12 ++++++++++++ ludwig/schema/features/loss/loss.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/ludwig/constants.py b/ludwig/constants.py index b72791b536f..7dfed99414c 100644 --- a/ludwig/constants.py +++ b/ludwig/constants.py @@ -72,6 +72,7 @@ MEAN_ABSOLUTE_PERCENTAGE_ERROR = "mean_absolute_percentage_error" HUBER = "huber" CORN = "corn" +REWARD = "reward_model_loss" R2 = "r2" EDIT_DISTANCE = "edit_distance" PERPLEXITY = "perplexity" diff --git a/ludwig/modules/loss_modules.py b/ludwig/modules/loss_modules.py index 62be167ef57..7ec5657b4a0 100644 --- a/ludwig/modules/loss_modules.py +++ b/ludwig/modules/loss_modules.py @@ -35,6 +35,7 @@ MAPELossConfig, MSELossConfig, NextTokenSoftmaxCrossEntropyLossConfig, + RewardLossConfig, RMSELossConfig, RMSPELossConfig, SequenceSoftmaxCrossEntropyLossConfig, @@ -264,3 +265,14 @@ def __init__(self, config: CORNLossConfig): def forward(self, preds: Tensor, target: Tensor) -> Tensor: num_classes = preds.shape[1] return corn_loss(preds, target, num_classes=num_classes) + + +@register_loss(RewardLossConfig) +class RewardLoss(nn.Module, LogitsInputsMixin): + """Reward loss.""" + + def __init__(self, config: RewardLossConfig): + super().__init__() + + def forward(self, chosen_reward: Tensor, rejected_reward: Tensor) -> Tensor: + return -1 * nn.functional.logsigmoid(chosen_reward - rejected_reward).mean() diff --git a/ludwig/schema/features/loss/loss.py b/ludwig/schema/features/loss/loss.py index f4ec1472e9d..48c905c44fc 100644 --- a/ludwig/schema/features/loss/loss.py +++ b/ludwig/schema/features/loss/loss.py @@ -12,6 +12,7 @@ MEAN_SQUARED_ERROR, NEXT_TOKEN_SOFTMAX_CROSS_ENTROPY, NUMBER, + REWARD, ROOT_MEAN_SQUARED_ERROR, ROOT_MEAN_SQUARED_PERCENTAGE_ERROR, SEQUENCE, @@ -475,3 +476,30 @@ def class_weights(self) -> int: @property def class_similarities_temperature(self) -> int: return 0 + + +@DeveloperAPI +@register_loss([NUMBER]) +@ludwig_dataclass +class RewardLossConfig(BaseLossConfig): + type: str = schema_utils.ProtectedString( + REWARD, + description=( + "This loss function is used to train reward models in Ludwig, for the purposes of RLHF. The " + "reward model will typically be an LLM or other large Transformer, with a single numerical output " + "feature. To train these models, data is provided in terms of pairs of responses (texts), one " + "of which is chosen and one of which is rejected, representing a human ranking assessment. The " + "model is trained using a contrastive loss procedure, maximizing the difference between the reward " + "score of the chosen text and the score of the rejected text." + ), + ) + + weight: float = schema_utils.NonNegativeFloat( + default=1.0, + description="Weight of the loss.", + parameter_metadata=LOSS_METADATA["RewardLoss"]["weight"], + ) + + @classmethod + def name(self) -> str: + return "Reward Model Loss" From 936194d4cfcdc85ef3fafeb2595251f60d473d03 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 7 Jun 2023 18:32:39 -0700 Subject: [PATCH 12/30] Add trainer --- ludwig/schema/trainer.py | 15 ++++ ludwig/trainers/trainer_rlhf.py | 118 ++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 ludwig/trainers/trainer_rlhf.py diff --git a/ludwig/schema/trainer.py b/ludwig/schema/trainer.py index 60cdb4caace..0198efb1d55 100644 --- a/ludwig/schema/trainer.py +++ b/ludwig/schema/trainer.py @@ -811,6 +811,21 @@ class FineTuneTrainerConfig(ECDTrainerConfig): ) +@DeveloperAPI +@register_llm_trainer_schema("reward_model") +@ludwig_dataclass +class RewardModelTrainerConfig(ECDTrainerConfig): + """Dataclass that configures most of the hyperparameters used for LLM RLHF reward model training.""" + + # Required for lookup during trainer initialization + type: str = schema_utils.ProtectedString("reward_model") + + base_learning_rate: float = schema_utils.NonNegativeFloat( + default=0.0, + description="Base learning rate used for training in the RLHF reward model trainer.", + ) + + @DeveloperAPI def get_model_type_jsonschema(model_type: str = MODEL_ECD): enum = [MODEL_ECD] diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py new file mode 100644 index 00000000000..99217259557 --- /dev/null +++ b/ludwig/trainers/trainer_rlhf.py @@ -0,0 +1,118 @@ +import logging +import torch +from typing import Dict, List, Optional, Tuple + +from ludwig.models.llm import LLM +from ludwig.modules.loss_modules import RewardLoss +from ludwig.schema.trainer import RewardModelTrainerConfig +from ludwig.trainers.registry import register_llm_trainer +from ludwig.trainers.trainer import Trainer +from ludwig.utils.defaults import default_random_seed +from ludwig.distributed.base import DistributedStrategy + +logger = logging.getLogger(__name__) + + +@register_llm_trainer("reward_model") +class RewardModelTrainer(Trainer): + @staticmethod + def get_schema_cls(): + return RewardModelTrainerConfig + + def __init__( + self, + config: RewardModelTrainerConfig, + model: LLM, + resume: float = False, + skip_save_model: bool = False, + skip_save_progress: bool = False, + skip_save_log: bool = False, + callbacks: List = None, + report_tqdm_to_ray=False, + random_seed: float = default_random_seed, + distributed: Optional[DistributedStrategy] = None, + device: Optional[str] = None, + **kwargs, + ): + super().__init__( + config, + model, + resume, + skip_save_model, + skip_save_progress, + skip_save_log, + callbacks, + report_tqdm_to_ray, + random_seed, + distributed, + device, + **kwargs, + ) + + # Save the reward model loss function + self.reward_loss_function = RewardLoss({}) + + # Save the reward model dataset parameters + if "preprocessing" not in config or "reward" not in config["preprocessing"]: + raise ValueError("Invalid reward model training config, expect preprocessing reward attributes.") + self.reward_model_dataset_params = config["preprocessing"]["reward"] + + def train_step( + self, inputs: Dict[str, torch.Tensor], targets: Dict[str, torch.Tensor], should_step: bool = True + ) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]: + """Performs a single training step of the RLHF reward model. + + Params: + inputs: A dictionary of input data, from feature name to tensor. + targets: A dictionary of target data, from feature name to tensor. + should_step: Whether to perform a step of the optimizer after computing gradients. + + Returns: + A tuple of the loss tensor and a dictionary of loss for every output feature. + """ + chosen_value = self.reward_model_dataset_params["chosen_value"] + rejected_value = self.reward_model_dataset_params["rejected_value"] + if chosen_value not in inputs or rejected_value not in inputs: + raise ValueError("Reward model preprocessing error: should have chosen/rejected values as table columns") + + # Other validations + if not all( + self.use_amp is False, + self.evaluate_training_set is True, + ): + raise ValueError("Invalid trainer arguments for RLHF reward model") + + # Run forward-propagation of the chosen and rejected inputs + with self.distributed.prepare_model_update(self.dist_model, should_step=should_step): + # Obtain model predictions and loss + model_output_chosen = self.dist_model((inputs[chosen_value])) + model_output_rejected = self.dist_model((inputs[rejected_value])) + loss = self.reward_loss_function(model_output_chosen, model_output_rejected) + loss = loss / self.gradient_accumulation_steps + all_losses = loss + + # Begin the backward pass + variables = self.dist_model.parameters() + self.distributed.backward(loss, self.dist_model) + + if not should_step: + # Short-circuit the parameter updates if we are still accumulating gradients + return loss, all_losses + + # Wait for gradient aggregation to complete before clipping the gradients + # When using AMP, we need to do this before unscaling. + # See: https://github.com/horovod/horovod/blob/master/examples/pytorch/pytorch_mnist.py + self.distributed.wait_optimizer_synced(self.optimizer) + + if self.distributed.allow_clip_gradients(): + # Clip gradients + self.clip_grads(variables) + + # Apply gradient updates + with self.distributed.prepare_optimizer_update(self.optimizer): + # Because we already synchronized above, we skip doing so here + self.distributed.step(self.optimizer) + + self.distributed.zero_grad(self.optimizer) + + return loss, all_losses From 2b74e7b43d56ae0dcd7ad62b601601face5a53ab Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Tue, 13 Jun 2023 06:41:21 -0700 Subject: [PATCH 13/30] Modify reward model trainer --- ludwig/trainers/trainer_rlhf.py | 35 +++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index 99217259557..382e043d62c 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -53,9 +53,10 @@ def __init__( self.reward_loss_function = RewardLoss({}) # Save the reward model dataset parameters - if "preprocessing" not in config or "reward" not in config["preprocessing"]: + if "preprocessing" not in config or "reward_dataset" not in config["preprocessing"]: raise ValueError("Invalid reward model training config, expect preprocessing reward attributes.") - self.reward_model_dataset_params = config["preprocessing"]["reward"] + self.reward_model_dataset_params = config["preprocessing"]["reward_dataset"] + self.reward_model_dataset_params["transcript_column"] = config["input_features"][0]["name"] def train_step( self, inputs: Dict[str, torch.Tensor], targets: Dict[str, torch.Tensor], should_step: bool = True @@ -70,10 +71,18 @@ def train_step( Returns: A tuple of the loss tensor and a dictionary of loss for every output feature. """ - chosen_value = self.reward_model_dataset_params["chosen_value"] - rejected_value = self.reward_model_dataset_params["rejected_value"] - if chosen_value not in inputs or rejected_value not in inputs: - raise ValueError("Reward model preprocessing error: should have chosen/rejected values as table columns") + id_column = self.reward_model_dataset_params["id_column"] + outcome_column = self.reward_model_dataset_params["outcome_column"] + transcript_column = self.reward_model_dataset_params["transcript_column"] + + # Validate inputs + input_names_expected = sorted([id_column, outcome_column, transcript_column]) + input_names_actual = sorted([input_name.split("_")[0] for input_name in inputs.keys()]) + if not input_names_actual == input_names_expected: + raise ValueError( + f"Invalid reward model training data input, expect inputs {input_names_expected}, " + f"got inputs {input_names_actual}." + ) # Other validations if not all( @@ -82,11 +91,21 @@ def train_step( ): raise ValueError("Invalid trainer arguments for RLHF reward model") + # Augment column names to processed versions + for input_name in inputs.keys(): + if id_column in input_name: + id_column = input_name + elif outcome_column in input_name: + outcome_column = input_name + elif transcript_column in input_name: + transcript_column = input_name + # Run forward-propagation of the chosen and rejected inputs with self.distributed.prepare_model_update(self.dist_model, should_step=should_step): # Obtain model predictions and loss - model_output_chosen = self.dist_model((inputs[chosen_value])) - model_output_rejected = self.dist_model((inputs[rejected_value])) + chosen_idx = inputs[outcome_column].index("chosen") + model_output_chosen = self.dist_model((inputs[transcript_column][chosen_idx])) + model_output_rejected = self.dist_model((inputs[transcript_column][1 - chosen_idx])) loss = self.reward_loss_function(model_output_chosen, model_output_rejected) loss = loss / self.gradient_accumulation_steps all_losses = loss From becf832206d87f83d9499e098f12d11f242b21e4 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Tue, 13 Jun 2023 06:43:04 -0700 Subject: [PATCH 14/30] Small edits --- ludwig/trainers/trainer_rlhf.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index 382e043d62c..c0dd5851ce4 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -1,14 +1,15 @@ import logging -import torch from typing import Dict, List, Optional, Tuple +import torch + +from ludwig.distributed.base import DistributedStrategy from ludwig.models.llm import LLM from ludwig.modules.loss_modules import RewardLoss from ludwig.schema.trainer import RewardModelTrainerConfig from ludwig.trainers.registry import register_llm_trainer from ludwig.trainers.trainer import Trainer from ludwig.utils.defaults import default_random_seed -from ludwig.distributed.base import DistributedStrategy logger = logging.getLogger(__name__) @@ -104,8 +105,8 @@ def train_step( with self.distributed.prepare_model_update(self.dist_model, should_step=should_step): # Obtain model predictions and loss chosen_idx = inputs[outcome_column].index("chosen") - model_output_chosen = self.dist_model((inputs[transcript_column][chosen_idx])) - model_output_rejected = self.dist_model((inputs[transcript_column][1 - chosen_idx])) + model_output_chosen = self.dist_model(inputs[transcript_column][chosen_idx]) + model_output_rejected = self.dist_model(inputs[transcript_column][1 - chosen_idx]) loss = self.reward_loss_function(model_output_chosen, model_output_rejected) loss = loss / self.gradient_accumulation_steps all_losses = loss From 7d4243f2d516ca96aa10745c41e44fb86be6a3f1 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Tue, 13 Jun 2023 08:14:52 -0700 Subject: [PATCH 15/30] Add trainer, data edits --- ludwig/data/preprocessing.py | 21 +++++++--- ludwig/schema/metadata/configs/loss.yaml | 3 ++ ludwig/trainers/trainer_rlhf.py | 2 +- tests/integration_tests/test_preprocessing.py | 6 +-- tests/integration_tests/test_trainer.py | 38 +++++++++++++++++++ 5 files changed, 60 insertions(+), 10 deletions(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 32d45634a70..768d66aa122 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1344,15 +1344,20 @@ def build_dataset( # Validate the input dataframe's columns dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) - if not sorted(dataset_df.columns) == dataset_columns_expected: + dataset_columns_actual = sorted(dataset_df.columns) + if "split" in dataset_columns_actual: + dataset_columns_actual.remove("split") + if dataset_columns_actual != dataset_columns_expected: raise ValueError( f"Invalid reward training input dataset, expect columns {dataset_columns_expected}, " - f"got columns {sorted(dataset_df.columns)}." + f"got columns {dataset_columns_actual}." ) # Validate the processed dataset columns - processed_column_names = ["_".join(column_name.split("_")[:2]) for column_name in sorted(proc_cols.keys())] - if not processed_column_names == dataset_columns_expected: + processed_column_names = sorted(["_".join(column_name.split("_")[:2]) for column_name in proc_cols.keys()]) + if "split" in processed_column_names: + processed_column_names.remove("split") + if processed_column_names != dataset_columns_expected: raise ValueError( f"Invalid reward training processed dataset, expect columns {dataset_columns_expected}, " f"got columns {processed_column_names}." @@ -1406,8 +1411,8 @@ def build_dataset( # If training a reward model, perform grouping and joining on dataset if mode == "training" and "reward_dataset" in global_preprocessing_parameters: reward_sample_value_map = { - chosen_value: "chosen", - rejected_value: "rejected", + chosen_value: 0, + rejected_value: 1, } # Group dataset rows by ID, aggregate group data @@ -1420,6 +1425,10 @@ def build_dataset( dataset_refactored[transcript_column] = ( dataset_id_groups[transcript_column].apply(list).reset_index()[transcript_column] ) + if "split" in dataset.columns: + dataset_refactored["split"] = ( + dataset_id_groups["split"].apply(lambda x: list(x)[0]).reset_index()["split"] + ) dataset = dataset_refactored return dataset, metadata diff --git a/ludwig/schema/metadata/configs/loss.yaml b/ludwig/schema/metadata/configs/loss.yaml index 240491bac00..50d83a6bb07 100644 --- a/ludwig/schema/metadata/configs/loss.yaml +++ b/ludwig/schema/metadata/configs/loss.yaml @@ -52,3 +52,6 @@ SigmoidCrossEntropyLoss: expected_impact: 3 weight: expected_impact: 2 +RewardLoss: + weight: + expected_impact: 2 diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index c0dd5851ce4..7edb140c2d2 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -104,7 +104,7 @@ def train_step( # Run forward-propagation of the chosen and rejected inputs with self.distributed.prepare_model_update(self.dist_model, should_step=should_step): # Obtain model predictions and loss - chosen_idx = inputs[outcome_column].index("chosen") + chosen_idx = inputs[outcome_column].index(0) model_output_chosen = self.dist_model(inputs[transcript_column][chosen_idx]) model_output_rejected = self.dist_model(inputs[transcript_column][1 - chosen_idx]) loss = self.reward_loss_function(model_output_chosen, model_output_rejected) diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 91e526c3304..3f8ca87d76e 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -202,7 +202,7 @@ def test_reward_model_dataset_refactor(): dataset = train_dataset.dataset transcript_column = config["input_features"][0]["name"] dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) - dataset_columns_actual = ["_".join(column_name.split("_")[:2]) for column_name in sorted(dataset.keys())] + dataset_columns_actual = sorted(["_".join(column_name.split("_")[:2]) for column_name in dataset.keys()]) assert dataset_columns_actual == dataset_columns_expected # Augment column names to processed versions @@ -218,8 +218,8 @@ def test_reward_model_dataset_refactor(): for row_id in range(len(dataset[id_column])): assert len(dataset[outcome_column][row_id]) == 2 assert len(dataset[transcript_column][row_id]) == 2 - assert dataset[outcome_column][row_id][0] in ["chosen", "rejected"] - assert dataset[outcome_column][row_id][1] in ["chosen", "rejected"] + assert dataset[outcome_column][row_id][0] in [0, 1] + assert dataset[outcome_column][row_id][1] in [0, 1] assert dataset[outcome_column][row_id][0] != dataset[outcome_column][row_id][1] diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index b7f623875e8..ece114a1f71 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -17,6 +17,7 @@ binary_feature, category_feature, generate_data, + generate_data_as_dataframe, LocalTestBackend, number_feature, RAY_BACKEND_CONFIG, @@ -216,6 +217,43 @@ def test_changing_parameters_on_plateau(tmpdir): model.train(training_set=data_csv, validation_set=val_csv, test_set=test_csv, output_directory=tmpdir) +def test_reward_model_training(tmpdir): + input_features = [text_feature()] + output_features = [number_feature(), category_feature(decoder={"vocab_size": 2})] + backend = LocalTestBackend() + config = {"input_features": input_features, "output_features": output_features} + + # Generate random dataframe + dataframe = generate_data_as_dataframe(input_features, output_features, num_examples=20) + + # Add reward model training pairs + id_column, outcome_column = "", "" + for column_name in dataframe.columns: + if "number" in column_name: + id_column = column_name + elif "category" in column_name: + outcome_column = column_name + dataframe[id_column] = dataframe.index // 2 + chosen_value = "some_value_1" + rejected_value = "some_value_2" + dataframe[outcome_column] = np.where(dataframe.index % 2, rejected_value, chosen_value) + + # Modify config with preprocessing + config["preprocessing"] = { + "reward_dataset": { + "id_column": id_column, + "outcome_column": outcome_column, + "chosen_value": chosen_value, + "rejected_value": rejected_value, + } + } + config[TRAINER] = {"type": "reward_model"} + + # Train Ludwig model with the dataset + ludwig_model = LudwigModel(config, backend=backend) + ludwig_model.train(training_set=dataframe, output_directory=tmpdir) + + @pytest.mark.distributed def test_lightgbm_dataset_partition(ray_cluster_2cpu): # Create a LightGBM model with a Ray backend From 316e2bf281796709e3eaaa2a220da7df3dde2361 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 13:42:18 -0700 Subject: [PATCH 16/30] Add schema changes --- ludwig/schema/decoders/llm_decoders.py | 13 ++++++++++++- ludwig/schema/features/number_feature.py | 14 +++++++++++++- tests/integration_tests/test_trainer.py | 4 ++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/ludwig/schema/decoders/llm_decoders.py b/ludwig/schema/decoders/llm_decoders.py index b64d3eb9272..4be735d3f57 100644 --- a/ludwig/schema/decoders/llm_decoders.py +++ b/ludwig/schema/decoders/llm_decoders.py @@ -1,7 +1,7 @@ from typing import Any, Dict from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import CATEGORY, MODEL_LLM, TEXT +from ludwig.constants import CATEGORY, MODEL_LLM, NUMBER, TEXT from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.base import BaseDecoderConfig from ludwig.schema.decoders.utils import register_decoder_config @@ -49,6 +49,17 @@ def module_name(cls): type: str = schema_utils.ProtectedString("text_extractor") +@DeveloperAPI +@register_decoder_config("number_extractor", [NUMBER], model_types=[MODEL_LLM]) +@ludwig_dataclass +class NumberExtractorDecoderConfig(BaseExtractorDecoderConfig, BaseDecoderConfig): + @classmethod + def module_name(cls): + return "NumberExtractorDecoder" + + type: str = schema_utils.ProtectedString("number_extractor") + + @DeveloperAPI @register_decoder_config("category_extractor", [CATEGORY], model_types=[MODEL_LLM]) @ludwig_dataclass diff --git a/ludwig/schema/features/number_feature.py b/ludwig/schema/features/number_feature.py index 97ea49123c6..e33af68f1c1 100644 --- a/ludwig/schema/features/number_feature.py +++ b/ludwig/schema/features/number_feature.py @@ -1,7 +1,7 @@ from typing import List, Tuple, Union from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import MEAN_SQUARED_ERROR, MODEL_ECD, MODEL_GBM, NUMBER +from ludwig.constants import MEAN_SQUARED_ERROR, MODEL_ECD, MODEL_GBM, MODEL_LLM, NUMBER from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.base import BaseDecoderConfig from ludwig.schema.decoders.utils import DecoderDataclassField @@ -20,6 +20,7 @@ gbm_input_config_registry, gbm_output_config_registry, input_mixin_registry, + llm_output_config_registry, output_mixin_registry, ) from ludwig.schema.metadata import FEATURE_METADATA @@ -179,3 +180,14 @@ class NumberDefaultsConfig(NumberInputFeatureConfigMixin, NumberOutputFeatureCon feature_type=NUMBER, default="regressor", ) + + +@DeveloperAPI +@llm_output_config_registry.register(NUMBER) +@ludwig_dataclass +class LLMNumberOutputFeatureConfig(NumberOutputFeatureConfig): + decoder: BaseDecoderConfig = DecoderDataclassField( + MODEL_LLM, + feature_type=NUMBER, + default="number_extractor", + ) diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index ece114a1f71..db3eee18be4 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -247,7 +247,11 @@ def test_reward_model_training(tmpdir): "rejected_value": rejected_value, } } + config["model_type"] = "llm" + config["model_name"] = "gpt2" + config["input_features"][0]["encoder"]["type"] = "passthrough" config[TRAINER] = {"type": "reward_model"} + config["output_features"] = config["output_features"][:1] # Train Ludwig model with the dataset ludwig_model = LudwigModel(config, backend=backend) From dd675d65a70445cd65b8af93b50889b1e5273a85 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 16:32:13 -0700 Subject: [PATCH 17/30] Add refactored processing logic and trainer --- ludwig/data/preprocessing.py | 82 ++++++++++++------- ludwig/schema/decoders/llm_decoders.py | 13 +-- ludwig/schema/features/number_feature.py | 14 +--- .../metadata/configs/preprocessing.yaml | 5 +- ludwig/schema/trainer.py | 4 +- ludwig/trainers/trainer_rlhf.py | 52 +++++------- tests/integration_tests/test_preprocessing.py | 45 ++++------ tests/integration_tests/test_trainer.py | 28 +++---- 8 files changed, 111 insertions(+), 132 deletions(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 768d66aa122..58665a539cd 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1329,6 +1329,7 @@ def build_dataset( "outcome_column", "chosen_value", "rejected_value", + "transcript_column", ] if not all( param_name in global_preprocessing_parameters["reward_dataset"] for param_name in reward_parameter_names @@ -1340,7 +1341,18 @@ def build_dataset( outcome_column = global_preprocessing_parameters["reward_dataset"]["outcome_column"] chosen_value = global_preprocessing_parameters["reward_dataset"]["chosen_value"] rejected_value = global_preprocessing_parameters["reward_dataset"]["rejected_value"] - transcript_column = config["input_features"][0]["name"] + transcript_column = global_preprocessing_parameters["reward_dataset"]["transcript_column"] + + # Validate the input configuration + if not all([ + len(config["input_features"]) == 1, + len(config["output_features"]) == 1, + config["input_features"][0]["name"] == transcript_column, + config["input_features"][0]["type"] == "text", + config["output_features"][0]["name"] == id_column, + config["output_features"][0]["type"] == "number", + ]): + raise ValueError(f"Invalid reward model training configuration, received {config}.") # Validate the input dataframe's columns dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) @@ -1354,25 +1366,20 @@ def build_dataset( ) # Validate the processed dataset columns - processed_column_names = sorted(["_".join(column_name.split("_")[:2]) for column_name in proc_cols.keys()]) - if "split" in processed_column_names: - processed_column_names.remove("split") - if processed_column_names != dataset_columns_expected: + id_column = config["output_features"][0]["proc_column"] + transcript_column = config["input_features"][0]["proc_column"] + proc_columns_expected = sorted([id_column, transcript_column]) + proc_columns_actual = sorted(proc_cols.keys()) + if "split" in proc_columns_actual: + proc_columns_actual.remove("split") + if proc_columns_actual != proc_columns_expected: raise ValueError( - f"Invalid reward training processed dataset, expect columns {dataset_columns_expected}, " - f"got columns {processed_column_names}." + f"Invalid reward training processed dataset, expect columns {proc_columns_expected}, " + f"got columns {proc_columns_actual}." ) - # Augment column names to processed versions - for column_name in proc_cols.keys(): - if id_column in column_name: - proc_cols[column_name] = dataset_df[id_column] - id_column = column_name - elif outcome_column in column_name: - proc_cols[column_name] = dataset_df[outcome_column] - outcome_column = column_name - elif transcript_column in column_name: - transcript_column = column_name + # Add the outcome column to processed columns + proc_cols[outcome_column] = dataset_df[outcome_column] # Implements an outer join of proc_cols dataset = backend.df_engine.df_like(dataset_df, proc_cols) @@ -1410,25 +1417,40 @@ def build_dataset( # If training a reward model, perform grouping and joining on dataset if mode == "training" and "reward_dataset" in global_preprocessing_parameters: - reward_sample_value_map = { - chosen_value: 0, - rejected_value: 1, - } + def parse_id_rows_group(rows_group): + rows_idxs = rows_group.index + + # Retrieve the outcome of the rows in this group + if len(rows_idxs) != 2: + raise ValueError( + f"Incorrect number of text rows for session ID {rows_group.name} when processing the " + f"reward model training dataset: expect 2 rows per session ID, got {len(rows_idxs)} rows." + ) + outcome_first = dataset.loc[rows_idxs[0]][outcome_column] + outcome_second = dataset.loc[rows_idxs[1]][outcome_column] + if not any([ + outcome_first == chosen_value and outcome_second == rejected_value, + outcome_first == rejected_value and outcome_second == chosen_value, + ]): + raise ValueError( + f"Incorrect labeling of the 2 text rows for session ID {rows_group.name} when processing " + f"the reward model training dataset: expect one row to be labeled as {chosen_value}, " + f"and one row to be labeled as {rejected_value}, but got {outcome_first} and {outcome_second}." + ) + + # Return the text transcripts for row in specific order + if outcome_first == chosen_value: + return [rows_group.loc[rows_idxs[0]], rows_group.loc[rows_idxs[1]]] + else: + return [rows_group.loc[rows_idxs[1]], rows_group.loc[rows_idxs[0]]] # Group dataset rows by ID, aggregate group data dataset_id_groups = dataset.groupby(id_column) dataset_refactored = ( - dataset_id_groups[outcome_column] - .apply(lambda x: [reward_sample_value_map[value] for value in list(x)]) - .reset_index() - ) - dataset_refactored[transcript_column] = ( - dataset_id_groups[transcript_column].apply(list).reset_index()[transcript_column] + dataset_id_groups[transcript_column].apply(parse_id_rows_group).reset_index() ) if "split" in dataset.columns: - dataset_refactored["split"] = ( - dataset_id_groups["split"].apply(lambda x: list(x)[0]).reset_index()["split"] - ) + dataset_refactored["split"] = dataset_id_groups["split"].apply(lambda x: list(x)[0]).reset_index()["split"] dataset = dataset_refactored return dataset, metadata diff --git a/ludwig/schema/decoders/llm_decoders.py b/ludwig/schema/decoders/llm_decoders.py index 4be735d3f57..b64d3eb9272 100644 --- a/ludwig/schema/decoders/llm_decoders.py +++ b/ludwig/schema/decoders/llm_decoders.py @@ -1,7 +1,7 @@ from typing import Any, Dict from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import CATEGORY, MODEL_LLM, NUMBER, TEXT +from ludwig.constants import CATEGORY, MODEL_LLM, TEXT from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.base import BaseDecoderConfig from ludwig.schema.decoders.utils import register_decoder_config @@ -49,17 +49,6 @@ def module_name(cls): type: str = schema_utils.ProtectedString("text_extractor") -@DeveloperAPI -@register_decoder_config("number_extractor", [NUMBER], model_types=[MODEL_LLM]) -@ludwig_dataclass -class NumberExtractorDecoderConfig(BaseExtractorDecoderConfig, BaseDecoderConfig): - @classmethod - def module_name(cls): - return "NumberExtractorDecoder" - - type: str = schema_utils.ProtectedString("number_extractor") - - @DeveloperAPI @register_decoder_config("category_extractor", [CATEGORY], model_types=[MODEL_LLM]) @ludwig_dataclass diff --git a/ludwig/schema/features/number_feature.py b/ludwig/schema/features/number_feature.py index e33af68f1c1..97ea49123c6 100644 --- a/ludwig/schema/features/number_feature.py +++ b/ludwig/schema/features/number_feature.py @@ -1,7 +1,7 @@ from typing import List, Tuple, Union from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import MEAN_SQUARED_ERROR, MODEL_ECD, MODEL_GBM, MODEL_LLM, NUMBER +from ludwig.constants import MEAN_SQUARED_ERROR, MODEL_ECD, MODEL_GBM, NUMBER from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.base import BaseDecoderConfig from ludwig.schema.decoders.utils import DecoderDataclassField @@ -20,7 +20,6 @@ gbm_input_config_registry, gbm_output_config_registry, input_mixin_registry, - llm_output_config_registry, output_mixin_registry, ) from ludwig.schema.metadata import FEATURE_METADATA @@ -180,14 +179,3 @@ class NumberDefaultsConfig(NumberInputFeatureConfigMixin, NumberOutputFeatureCon feature_type=NUMBER, default="regressor", ) - - -@DeveloperAPI -@llm_output_config_registry.register(NUMBER) -@ludwig_dataclass -class LLMNumberOutputFeatureConfig(NumberOutputFeatureConfig): - decoder: BaseDecoderConfig = DecoderDataclassField( - MODEL_LLM, - feature_type=NUMBER, - default="number_extractor", - ) diff --git a/ludwig/schema/metadata/configs/preprocessing.yaml b/ludwig/schema/metadata/configs/preprocessing.yaml index 9efa16092c2..e657ae7c132 100644 --- a/ludwig/schema/metadata/configs/preprocessing.yaml +++ b/ludwig/schema/metadata/configs/preprocessing.yaml @@ -147,7 +147,8 @@ cache_encoder_embeddings: expected_impact: 1 ui_display_name: Cache Encoder Embeddings reward_dataset: - id_column: The name of the reward model training dataset session ID column - outcome_column: The name of the reward model training dataset chosen/rejected outcome column + id_column: The name of the reward model training dataset session ID (and reward placeholder) column + outcome_column: The name of the reward model training dataset human-labeled chosen/rejected outcome column chosen_value: The value of the string in the outcome column corresponding to chosen samples rejected_value: The value of the string in the outcome column corresponding to rejected samples + transcript_column: The name of the reward model training dataset input text transcript to train with column diff --git a/ludwig/schema/trainer.py b/ludwig/schema/trainer.py index 0198efb1d55..e3f637480e9 100644 --- a/ludwig/schema/trainer.py +++ b/ludwig/schema/trainer.py @@ -812,10 +812,10 @@ class FineTuneTrainerConfig(ECDTrainerConfig): @DeveloperAPI -@register_llm_trainer_schema("reward_model") +@register_trainer_schema("reward_model") @ludwig_dataclass class RewardModelTrainerConfig(ECDTrainerConfig): - """Dataclass that configures most of the hyperparameters used for LLM RLHF reward model training.""" + """Dataclass that configures most of the hyperparameters used for RLHF reward model training.""" # Required for lookup during trainer initialization type: str = schema_utils.ProtectedString("reward_model") diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index 7edb140c2d2..fe910a9cef5 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -7,14 +7,14 @@ from ludwig.models.llm import LLM from ludwig.modules.loss_modules import RewardLoss from ludwig.schema.trainer import RewardModelTrainerConfig -from ludwig.trainers.registry import register_llm_trainer +from ludwig.trainers.registry import register_trainer from ludwig.trainers.trainer import Trainer from ludwig.utils.defaults import default_random_seed logger = logging.getLogger(__name__) -@register_llm_trainer("reward_model") +@register_trainer("reward_model") class RewardModelTrainer(Trainer): @staticmethod def get_schema_cls(): @@ -54,10 +54,8 @@ def __init__( self.reward_loss_function = RewardLoss({}) # Save the reward model dataset parameters - if "preprocessing" not in config or "reward_dataset" not in config["preprocessing"]: - raise ValueError("Invalid reward model training config, expect preprocessing reward attributes.") - self.reward_model_dataset_params = config["preprocessing"]["reward_dataset"] - self.reward_model_dataset_params["transcript_column"] = config["input_features"][0]["name"] + self.id_column = config["output_features"][0]["proc_column"] + self.transcript_column = config["input_features"][0]["proc_column"] def train_step( self, inputs: Dict[str, torch.Tensor], targets: Dict[str, torch.Tensor], should_step: bool = True @@ -72,41 +70,33 @@ def train_step( Returns: A tuple of the loss tensor and a dictionary of loss for every output feature. """ - id_column = self.reward_model_dataset_params["id_column"] - outcome_column = self.reward_model_dataset_params["outcome_column"] - transcript_column = self.reward_model_dataset_params["transcript_column"] - - # Validate inputs - input_names_expected = sorted([id_column, outcome_column, transcript_column]) - input_names_actual = sorted([input_name.split("_")[0] for input_name in inputs.keys()]) - if not input_names_actual == input_names_expected: - raise ValueError( - f"Invalid reward model training data input, expect inputs {input_names_expected}, " - f"got inputs {input_names_actual}." - ) - - # Other validations if not all( self.use_amp is False, self.evaluate_training_set is True, ): raise ValueError("Invalid trainer arguments for RLHF reward model") - # Augment column names to processed versions - for input_name in inputs.keys(): - if id_column in input_name: - id_column = input_name - elif outcome_column in input_name: - outcome_column = input_name - elif transcript_column in input_name: - transcript_column = input_name + # Validate inputs and targets + input_names_expected = [self.transcript_column] + input_names_actual = list(inputs.keys()) + if not input_names_actual == input_names_expected: + raise ValueError( + f"Invalid reward model training data inputs, expect inputs {input_names_expected}, " + f"got inputs {input_names_actual}." + ) + target_names_expected = [self.id_column] + target_names_actual = list(targets.keys()) + if not target_names_actual == target_names_expected: + raise ValueError( + f"Invalid reward model training data targets, expect targets {target_names_expected}, " + f"got targets {target_names_actual}." + ) # Run forward-propagation of the chosen and rejected inputs with self.distributed.prepare_model_update(self.dist_model, should_step=should_step): # Obtain model predictions and loss - chosen_idx = inputs[outcome_column].index(0) - model_output_chosen = self.dist_model(inputs[transcript_column][chosen_idx]) - model_output_rejected = self.dist_model(inputs[transcript_column][1 - chosen_idx]) + model_output_chosen = self.dist_model(inputs[self.transcript_column][0]) + model_output_rejected = self.dist_model(inputs[self.transcript_column][1]) loss = self.reward_loss_function(model_output_chosen, model_output_rejected) loss = loss / self.gradient_accumulation_steps all_losses = loss diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 3f8ca87d76e..b5f1808d6be 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -163,9 +163,18 @@ def test_strip_whitespace_category(csv_filename, tmpdir): assert len(np.unique(train_ds.dataset[cat_feat[PROC_COLUMN]])) == cat_feat[DECODER]["vocab_size"] -def test_reward_model_dataset_refactor(): - input_features = [text_feature()] - output_features = [number_feature(), category_feature(decoder={"vocab_size": 2})] +def test_rlhf_reward_model_data_preprocessor(): + id_column = "Reward_Session_ID" + outcome_column = "Human_Feedback_Outcome" + chosen_value = "some_value_1" + rejected_value = "some_value_2" + transcript_column = "Transcript" + + # Define the features + input_features = [text_feature( + name=transcript_column, + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2"})] + output_features = [number_feature(name=id_column)] backend = LocalTestBackend() config = {"input_features": input_features, "output_features": output_features} @@ -173,15 +182,7 @@ def test_reward_model_dataset_refactor(): dataframe = generate_data_as_dataframe(input_features, output_features, num_examples=20) # Add reward model training pairs - id_column, outcome_column = "", "" - for column_name in dataframe.columns: - if "number" in column_name: - id_column = column_name - elif "category" in column_name: - outcome_column = column_name dataframe[id_column] = dataframe.index // 2 - chosen_value = "some_value_1" - rejected_value = "some_value_2" dataframe[outcome_column] = np.where(dataframe.index % 2, rejected_value, chosen_value) # Modify config with preprocessing @@ -191,8 +192,10 @@ def test_reward_model_dataset_refactor(): "outcome_column": outcome_column, "chosen_value": chosen_value, "rejected_value": rejected_value, + "transcript_column": transcript_column, } } + config[TRAINER] = {"type": "reward_model"} # Run preprocessing, get output dataset ludwig_model = LudwigModel(config, backend=backend) @@ -200,27 +203,15 @@ def test_reward_model_dataset_refactor(): # Validate the processed dataset columns dataset = train_dataset.dataset - transcript_column = config["input_features"][0]["name"] - dataset_columns_expected = sorted([id_column, outcome_column, transcript_column]) - dataset_columns_actual = sorted(["_".join(column_name.split("_")[:2]) for column_name in dataset.keys()]) + id_column = config["output_features"][0]["proc_column"] + transcript_column = config["input_features"][0]["proc_column"] + dataset_columns_expected = sorted([id_column, transcript_column]) + dataset_columns_actual = sorted(dataset.keys()) assert dataset_columns_actual == dataset_columns_expected - # Augment column names to processed versions - for column_name in dataset.keys(): - if id_column in column_name: - id_column = column_name - elif outcome_column in column_name: - outcome_column = column_name - elif transcript_column in column_name: - transcript_column = column_name - # Validate each row in the processed dataset for row_id in range(len(dataset[id_column])): - assert len(dataset[outcome_column][row_id]) == 2 assert len(dataset[transcript_column][row_id]) == 2 - assert dataset[outcome_column][row_id][0] in [0, 1] - assert dataset[outcome_column][row_id][1] in [0, 1] - assert dataset[outcome_column][row_id][0] != dataset[outcome_column][row_id][1] @pytest.mark.parametrize( diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index db3eee18be4..0523fbff40a 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -217,9 +217,18 @@ def test_changing_parameters_on_plateau(tmpdir): model.train(training_set=data_csv, validation_set=val_csv, test_set=test_csv, output_directory=tmpdir) -def test_reward_model_training(tmpdir): - input_features = [text_feature()] - output_features = [number_feature(), category_feature(decoder={"vocab_size": 2})] +def test_rlhf_reward_model_trainer(tmpdir): + id_column = "Reward_Session_ID" + outcome_column = "Human_Feedback_Outcome" + chosen_value = "some_value_1" + rejected_value = "some_value_2" + transcript_column = "Transcript" + + # Define the features + input_features = [text_feature( + name=transcript_column, + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2"})] + output_features = [number_feature(name=id_column)] backend = LocalTestBackend() config = {"input_features": input_features, "output_features": output_features} @@ -227,15 +236,7 @@ def test_reward_model_training(tmpdir): dataframe = generate_data_as_dataframe(input_features, output_features, num_examples=20) # Add reward model training pairs - id_column, outcome_column = "", "" - for column_name in dataframe.columns: - if "number" in column_name: - id_column = column_name - elif "category" in column_name: - outcome_column = column_name dataframe[id_column] = dataframe.index // 2 - chosen_value = "some_value_1" - rejected_value = "some_value_2" dataframe[outcome_column] = np.where(dataframe.index % 2, rejected_value, chosen_value) # Modify config with preprocessing @@ -245,13 +246,10 @@ def test_reward_model_training(tmpdir): "outcome_column": outcome_column, "chosen_value": chosen_value, "rejected_value": rejected_value, + "transcript_column": transcript_column, } } - config["model_type"] = "llm" - config["model_name"] = "gpt2" - config["input_features"][0]["encoder"]["type"] = "passthrough" config[TRAINER] = {"type": "reward_model"} - config["output_features"] = config["output_features"][:1] # Train Ludwig model with the dataset ludwig_model = LudwigModel(config, backend=backend) From cae42ad66d4af571d826fcf592ed49554b24bda1 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 16:40:14 -0700 Subject: [PATCH 18/30] Style edits --- ludwig/data/preprocessing.py | 33 ++++++++++--------- tests/integration_tests/test_preprocessing.py | 9 +++-- tests/integration_tests/test_trainer.py | 9 +++-- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 58665a539cd..b4b53161831 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1344,14 +1344,16 @@ def build_dataset( transcript_column = global_preprocessing_parameters["reward_dataset"]["transcript_column"] # Validate the input configuration - if not all([ - len(config["input_features"]) == 1, - len(config["output_features"]) == 1, - config["input_features"][0]["name"] == transcript_column, - config["input_features"][0]["type"] == "text", - config["output_features"][0]["name"] == id_column, - config["output_features"][0]["type"] == "number", - ]): + if not all( + [ + len(config["input_features"]) == 1, + len(config["output_features"]) == 1, + config["input_features"][0]["name"] == transcript_column, + config["input_features"][0]["type"] == "text", + config["output_features"][0]["name"] == id_column, + config["output_features"][0]["type"] == "number", + ] + ): raise ValueError(f"Invalid reward model training configuration, received {config}.") # Validate the input dataframe's columns @@ -1417,6 +1419,7 @@ def build_dataset( # If training a reward model, perform grouping and joining on dataset if mode == "training" and "reward_dataset" in global_preprocessing_parameters: + def parse_id_rows_group(rows_group): rows_idxs = rows_group.index @@ -1428,10 +1431,12 @@ def parse_id_rows_group(rows_group): ) outcome_first = dataset.loc[rows_idxs[0]][outcome_column] outcome_second = dataset.loc[rows_idxs[1]][outcome_column] - if not any([ - outcome_first == chosen_value and outcome_second == rejected_value, - outcome_first == rejected_value and outcome_second == chosen_value, - ]): + if not any( + [ + outcome_first == chosen_value and outcome_second == rejected_value, + outcome_first == rejected_value and outcome_second == chosen_value, + ] + ): raise ValueError( f"Incorrect labeling of the 2 text rows for session ID {rows_group.name} when processing " f"the reward model training dataset: expect one row to be labeled as {chosen_value}, " @@ -1446,9 +1451,7 @@ def parse_id_rows_group(rows_group): # Group dataset rows by ID, aggregate group data dataset_id_groups = dataset.groupby(id_column) - dataset_refactored = ( - dataset_id_groups[transcript_column].apply(parse_id_rows_group).reset_index() - ) + dataset_refactored = dataset_id_groups[transcript_column].apply(parse_id_rows_group).reset_index() if "split" in dataset.columns: dataset_refactored["split"] = dataset_id_groups["split"].apply(lambda x: list(x)[0]).reset_index()["split"] dataset = dataset_refactored diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index b5f1808d6be..4a7ade4d24a 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -171,9 +171,12 @@ def test_rlhf_reward_model_data_preprocessor(): transcript_column = "Transcript" # Define the features - input_features = [text_feature( - name=transcript_column, - encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2"})] + input_features = [ + text_feature( + name=transcript_column, + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2"} + ) + ] output_features = [number_feature(name=id_column)] backend = LocalTestBackend() config = {"input_features": input_features, "output_features": output_features} diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index 0523fbff40a..e3f8ff8fe3e 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -225,9 +225,12 @@ def test_rlhf_reward_model_trainer(tmpdir): transcript_column = "Transcript" # Define the features - input_features = [text_feature( - name=transcript_column, - encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2"})] + input_features = [ + text_feature( + name=transcript_column, + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2"} + ) + ] output_features = [number_feature(name=id_column)] backend = LocalTestBackend() config = {"input_features": input_features, "output_features": output_features} From a0808cf6e2b20201dd339b9eec34c2ba95e294de Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 16:43:08 -0700 Subject: [PATCH 19/30] Modify tests --- tests/integration_tests/test_preprocessing.py | 2 +- tests/integration_tests/test_trainer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 4a7ade4d24a..bf91631e1c5 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -174,7 +174,7 @@ def test_rlhf_reward_model_data_preprocessor(): input_features = [ text_feature( name=transcript_column, - encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2"} + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "bert-base-uncased"} ) ] output_features = [number_feature(name=id_column)] diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index e3f8ff8fe3e..d9fc0c41ee0 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -228,7 +228,7 @@ def test_rlhf_reward_model_trainer(tmpdir): input_features = [ text_feature( name=transcript_column, - encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2"} + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "bert-base-uncased"} ) ] output_features = [number_feature(name=id_column)] From 9b46959fa8eb44fd5a4f81bda7ffbaba7f1c5a22 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 17:00:29 -0700 Subject: [PATCH 20/30] More test edits --- tests/integration_tests/test_preprocessing.py | 8 ++++---- tests/integration_tests/test_trainer.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index bf91631e1c5..89e7a313186 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -164,17 +164,17 @@ def test_strip_whitespace_category(csv_filename, tmpdir): def test_rlhf_reward_model_data_preprocessor(): - id_column = "Reward_Session_ID" - outcome_column = "Human_Feedback_Outcome" + id_column = "reward_session_id" + outcome_column = "outcome" chosen_value = "some_value_1" rejected_value = "some_value_2" - transcript_column = "Transcript" + transcript_column = "transcript" # Define the features input_features = [ text_feature( name=transcript_column, - encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "bert-base-uncased"} + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "bert-base-uncased"}, ) ] output_features = [number_feature(name=id_column)] diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index d9fc0c41ee0..3647998e138 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -218,17 +218,17 @@ def test_changing_parameters_on_plateau(tmpdir): def test_rlhf_reward_model_trainer(tmpdir): - id_column = "Reward_Session_ID" - outcome_column = "Human_Feedback_Outcome" + id_column = "reward_session_id" + outcome_column = "outcome" chosen_value = "some_value_1" rejected_value = "some_value_2" - transcript_column = "Transcript" + transcript_column = "transcript" # Define the features input_features = [ text_feature( name=transcript_column, - encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "bert-base-uncased"} + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "bert-base-uncased"}, ) ] output_features = [number_feature(name=id_column)] From cc40f7c302eb3946a17a73c65a6e3c84ffa768a1 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 18:36:08 -0700 Subject: [PATCH 21/30] Make reward model a separate model type --- ludwig/constants.py | 1 + ludwig/models/ecd.py | 8 +++++++- ludwig/models/registry.py | 5 +++-- ludwig/schema/decoders/base.py | 5 +++-- ludwig/schema/features/number_feature.py | 14 +++++++++++++- ludwig/schema/features/utils.py | 3 ++- ludwig/schema/model_types/ecd.py | 7 +++++++ ludwig/trainers/__init__.py | 6 ++++++ ludwig/trainers/trainer_rlhf.py | 24 ++++++++---------------- tests/integration_tests/test_trainer.py | 2 +- 10 files changed, 51 insertions(+), 24 deletions(-) diff --git a/ludwig/constants.py b/ludwig/constants.py index 7dfed99414c..9cf063218ea 100644 --- a/ludwig/constants.py +++ b/ludwig/constants.py @@ -270,6 +270,7 @@ MODEL_ECD = "ecd" MODEL_GBM = "gbm" MODEL_LLM = "llm" +MODEL_REWARD = "model_reward" DASK_MODULE_NAME = "dask.dataframe" LUDWIG_VERSION = "ludwig_version" diff --git a/ludwig/models/ecd.py b/ludwig/models/ecd.py index d8d921598d2..015e26195f9 100644 --- a/ludwig/models/ecd.py +++ b/ludwig/models/ecd.py @@ -6,7 +6,7 @@ import torch from ludwig.combiners.combiners import create_combiner -from ludwig.constants import MODEL_ECD +from ludwig.constants import MODEL_ECD, MODEL_REWARD from ludwig.globals import MODEL_WEIGHTS_FILE_NAME from ludwig.models.base import BaseModel from ludwig.schema.model_types.ecd import ECDModelConfig @@ -177,3 +177,9 @@ def get_augmentation_pipelines(self) -> AugmentationPipelines: ).get_augmentation_pipeline() return AugmentationPipelines(augmentation_pipelines) + + +class RewardModel(ECD): + @staticmethod + def type() -> str: + return MODEL_REWARD diff --git a/ludwig/models/registry.py b/ludwig/models/registry.py index 5cb724cf229..c9c80ff7e1e 100644 --- a/ludwig/models/registry.py +++ b/ludwig/models/registry.py @@ -1,7 +1,7 @@ import logging -from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_LLM -from ludwig.models.ecd import ECD +from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_REWARD +from ludwig.models.ecd import ECD, RewardModel from ludwig.models.llm import LLM logger = logging.getLogger(__name__) @@ -24,4 +24,5 @@ def gbm(*args, **kwargs): MODEL_ECD: ECD, MODEL_GBM: gbm, MODEL_LLM: LLM, + MODEL_REWARD: RewardModel, } diff --git a/ludwig/schema/decoders/base.py b/ludwig/schema/decoders/base.py index f1e27833fd8..48f87901f89 100644 --- a/ludwig/schema/decoders/base.py +++ b/ludwig/schema/decoders/base.py @@ -2,7 +2,8 @@ from typing import Dict, List, Tuple, Union from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import BINARY, CATEGORY, MODEL_ECD, MODEL_GBM, MODEL_LLM, NUMBER, SET, TIMESERIES, VECTOR +from ludwig.constants import ( + BINARY, CATEGORY, MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_REWARD, NUMBER, SET, TIMESERIES, VECTOR) from ludwig.schema import common_fields from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.utils import register_decoder_config @@ -108,7 +109,7 @@ def module_name(cls): @DeveloperAPI -@register_decoder_config("regressor", [BINARY, NUMBER], model_types=[MODEL_ECD, MODEL_GBM]) +@register_decoder_config("regressor", [BINARY, NUMBER], model_types=[MODEL_ECD, MODEL_GBM, MODEL_REWARD]) @ludwig_dataclass class RegressorConfig(BaseDecoderConfig): """RegressorConfig is a dataclass that configures the parameters used for a regressor decoder.""" diff --git a/ludwig/schema/features/number_feature.py b/ludwig/schema/features/number_feature.py index 97ea49123c6..38cdd3d954d 100644 --- a/ludwig/schema/features/number_feature.py +++ b/ludwig/schema/features/number_feature.py @@ -1,7 +1,7 @@ from typing import List, Tuple, Union from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import MEAN_SQUARED_ERROR, MODEL_ECD, MODEL_GBM, NUMBER +from ludwig.constants import MEAN_SQUARED_ERROR, MODEL_ECD, MODEL_GBM, MODEL_REWARD, NUMBER from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.base import BaseDecoderConfig from ludwig.schema.decoders.utils import DecoderDataclassField @@ -16,6 +16,7 @@ ecd_defaults_config_registry, ecd_input_config_registry, ecd_output_config_registry, + reward_model_output_config_registry, gbm_defaults_config_registry, gbm_input_config_registry, gbm_output_config_registry, @@ -153,6 +154,17 @@ class ECDNumberOutputFeatureConfig(NumberOutputFeatureConfig): ) +@DeveloperAPI +@reward_model_output_config_registry.register(NUMBER) +@ludwig_dataclass +class RewardModelNumberOutputFeatureConfig(NumberOutputFeatureConfig): + decoder: BaseDecoderConfig = DecoderDataclassField( + MODEL_REWARD, + feature_type=NUMBER, + default="regressor", + ) + + @DeveloperAPI @gbm_output_config_registry.register(NUMBER) @ludwig_dataclass diff --git a/ludwig/schema/features/utils.py b/ludwig/schema/features/utils.py index 34abd2eee15..0602702a728 100644 --- a/ludwig/schema/features/utils.py +++ b/ludwig/schema/features/utils.py @@ -1,7 +1,7 @@ from collections import defaultdict from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_LLM +from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_REWARD from ludwig.schema import utils as schema_utils from ludwig.utils.registry import Registry @@ -15,6 +15,7 @@ ecd_output_config_registry = output_config_registries[MODEL_ECD] gbm_output_config_registry = output_config_registries[MODEL_GBM] llm_output_config_registry = output_config_registries[MODEL_LLM] +reward_model_output_config_registry = output_config_registries[MODEL_REWARD] input_mixin_registry = Registry() output_mixin_registry = Registry() diff --git a/ludwig/schema/model_types/ecd.py b/ludwig/schema/model_types/ecd.py index 967d12ae143..76ca6fdea3a 100644 --- a/ludwig/schema/model_types/ecd.py +++ b/ludwig/schema/model_types/ecd.py @@ -36,3 +36,10 @@ class ECDModelConfig(ModelConfig): preprocessing: PreprocessingConfig = PreprocessingField().get_default_field() defaults: ECDDefaultsConfig = ECDDefaultsField().get_default_field() hyperopt: Optional[HyperoptConfig] = HyperoptField().get_default_field() + + +@DeveloperAPI +@register_model_type(name="reward_model") +@ludwig_dataclass +class RewardModelConfig(ECDModelConfig): + model_type: str = schema_utils.ProtectedString("reward_model") diff --git a/ludwig/trainers/__init__.py b/ludwig/trainers/__init__.py index e25dba8b547..37a9e656391 100644 --- a/ludwig/trainers/__init__.py +++ b/ludwig/trainers/__init__.py @@ -12,3 +12,9 @@ import ludwig.trainers.trainer_llm # noqa: F401 except ImportError: pass + + +try: + import ludwig.trainers.trainer_rlhf # noqa: F401 +except ImportError: + pass diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index fe910a9cef5..2d0ecdbb630 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -53,10 +53,6 @@ def __init__( # Save the reward model loss function self.reward_loss_function = RewardLoss({}) - # Save the reward model dataset parameters - self.id_column = config["output_features"][0]["proc_column"] - self.transcript_column = config["input_features"][0]["proc_column"] - def train_step( self, inputs: Dict[str, torch.Tensor], targets: Dict[str, torch.Tensor], should_step: bool = True ) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]: @@ -71,25 +67,21 @@ def train_step( A tuple of the loss tensor and a dictionary of loss for every output feature. """ if not all( - self.use_amp is False, - self.evaluate_training_set is True, + [ + self.use_amp is False, + self.evaluate_training_set is True, + ] ): raise ValueError("Invalid trainer arguments for RLHF reward model") # Validate inputs and targets - input_names_expected = [self.transcript_column] - input_names_actual = list(inputs.keys()) - if not input_names_actual == input_names_expected: + if not len(inputs) == 1: raise ValueError( - f"Invalid reward model training data inputs, expect inputs {input_names_expected}, " - f"got inputs {input_names_actual}." + f"Invalid reward model training data inputs, expect 1 input feature, got {len(inputs)}." ) - target_names_expected = [self.id_column] - target_names_actual = list(targets.keys()) - if not target_names_actual == target_names_expected: + if not len(targets) == 1: raise ValueError( - f"Invalid reward model training data targets, expect targets {target_names_expected}, " - f"got targets {target_names_actual}." + f"Invalid reward model training data targets, expect 1 target feature, got {len(targets)}." ) # Run forward-propagation of the chosen and rejected inputs diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index 3647998e138..a0dfbe8fe67 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -252,7 +252,7 @@ def test_rlhf_reward_model_trainer(tmpdir): "transcript_column": transcript_column, } } - config[TRAINER] = {"type": "reward_model"} + config["model_type"] = "reward_model" # Train Ludwig model with the dataset ludwig_model = LudwigModel(config, backend=backend) From a93a1b8700fe0210ea15ed4705b488ef2cd6e741 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 19:00:33 -0700 Subject: [PATCH 22/30] Additional refactor edits --- ludwig/constants.py | 2 +- ludwig/models/ecd.py | 11 ++++++++--- ludwig/models/registry.py | 6 +++--- ludwig/schema/decoders/base.py | 4 ++-- ludwig/schema/features/number_feature.py | 10 +++++----- ludwig/schema/features/utils.py | 4 ++-- ludwig/schema/model_types/ecd.py | 8 +++++--- ludwig/schema/trainer.py | 12 +++++------- ludwig/trainers/trainer_rlhf.py | 13 +++++++------ 9 files changed, 38 insertions(+), 32 deletions(-) diff --git a/ludwig/constants.py b/ludwig/constants.py index 9cf063218ea..33a52d84123 100644 --- a/ludwig/constants.py +++ b/ludwig/constants.py @@ -270,7 +270,7 @@ MODEL_ECD = "ecd" MODEL_GBM = "gbm" MODEL_LLM = "llm" -MODEL_REWARD = "model_reward" +MODEL_RWD = "rwd" DASK_MODULE_NAME = "dask.dataframe" LUDWIG_VERSION = "ludwig_version" diff --git a/ludwig/models/ecd.py b/ludwig/models/ecd.py index 015e26195f9..14bc82ed988 100644 --- a/ludwig/models/ecd.py +++ b/ludwig/models/ecd.py @@ -6,7 +6,7 @@ import torch from ludwig.combiners.combiners import create_combiner -from ludwig.constants import MODEL_ECD, MODEL_REWARD +from ludwig.constants import MODEL_ECD, MODEL_RWD from ludwig.globals import MODEL_WEIGHTS_FILE_NAME from ludwig.models.base import BaseModel from ludwig.schema.model_types.ecd import ECDModelConfig @@ -179,7 +179,12 @@ def get_augmentation_pipelines(self) -> AugmentationPipelines: return AugmentationPipelines(augmentation_pipelines) -class RewardModel(ECD): +class RWD(ECD): + """ + This class represents a Reward Model, a model that inputs some feature (i.e. text transcript) and predicts + a single scalar output representing the reward/preference of that input. This model type is used for applications + such as RLHF fine-tuning of LLMs. This model class is a subclass of ECD, and uses most of ECD's code and pathways. + """ @staticmethod def type() -> str: - return MODEL_REWARD + return MODEL_RWD diff --git a/ludwig/models/registry.py b/ludwig/models/registry.py index c9c80ff7e1e..30cecf5b884 100644 --- a/ludwig/models/registry.py +++ b/ludwig/models/registry.py @@ -1,7 +1,7 @@ import logging -from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_REWARD -from ludwig.models.ecd import ECD, RewardModel +from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_RWD +from ludwig.models.ecd import ECD, RWD from ludwig.models.llm import LLM logger = logging.getLogger(__name__) @@ -24,5 +24,5 @@ def gbm(*args, **kwargs): MODEL_ECD: ECD, MODEL_GBM: gbm, MODEL_LLM: LLM, - MODEL_REWARD: RewardModel, + MODEL_RWD: RWD, } diff --git a/ludwig/schema/decoders/base.py b/ludwig/schema/decoders/base.py index 48f87901f89..dbad3d09787 100644 --- a/ludwig/schema/decoders/base.py +++ b/ludwig/schema/decoders/base.py @@ -3,7 +3,7 @@ from ludwig.api_annotations import DeveloperAPI from ludwig.constants import ( - BINARY, CATEGORY, MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_REWARD, NUMBER, SET, TIMESERIES, VECTOR) + BINARY, CATEGORY, MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_RWD, NUMBER, SET, TIMESERIES, VECTOR) from ludwig.schema import common_fields from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.utils import register_decoder_config @@ -109,7 +109,7 @@ def module_name(cls): @DeveloperAPI -@register_decoder_config("regressor", [BINARY, NUMBER], model_types=[MODEL_ECD, MODEL_GBM, MODEL_REWARD]) +@register_decoder_config("regressor", [BINARY, NUMBER], model_types=[MODEL_ECD, MODEL_GBM, MODEL_RWD]) @ludwig_dataclass class RegressorConfig(BaseDecoderConfig): """RegressorConfig is a dataclass that configures the parameters used for a regressor decoder.""" diff --git a/ludwig/schema/features/number_feature.py b/ludwig/schema/features/number_feature.py index 38cdd3d954d..387d6cbf57b 100644 --- a/ludwig/schema/features/number_feature.py +++ b/ludwig/schema/features/number_feature.py @@ -1,7 +1,7 @@ from typing import List, Tuple, Union from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import MEAN_SQUARED_ERROR, MODEL_ECD, MODEL_GBM, MODEL_REWARD, NUMBER +from ludwig.constants import MEAN_SQUARED_ERROR, MODEL_ECD, MODEL_GBM, MODEL_RWD, NUMBER from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.base import BaseDecoderConfig from ludwig.schema.decoders.utils import DecoderDataclassField @@ -16,7 +16,7 @@ ecd_defaults_config_registry, ecd_input_config_registry, ecd_output_config_registry, - reward_model_output_config_registry, + rwd_output_config_registry, gbm_defaults_config_registry, gbm_input_config_registry, gbm_output_config_registry, @@ -155,11 +155,11 @@ class ECDNumberOutputFeatureConfig(NumberOutputFeatureConfig): @DeveloperAPI -@reward_model_output_config_registry.register(NUMBER) +@rwd_output_config_registry.register(NUMBER) @ludwig_dataclass -class RewardModelNumberOutputFeatureConfig(NumberOutputFeatureConfig): +class RWDNumberOutputFeatureConfig(NumberOutputFeatureConfig): decoder: BaseDecoderConfig = DecoderDataclassField( - MODEL_REWARD, + MODEL_RWD, feature_type=NUMBER, default="regressor", ) diff --git a/ludwig/schema/features/utils.py b/ludwig/schema/features/utils.py index 0602702a728..97da367bb5d 100644 --- a/ludwig/schema/features/utils.py +++ b/ludwig/schema/features/utils.py @@ -1,7 +1,7 @@ from collections import defaultdict from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_REWARD +from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_RWD from ludwig.schema import utils as schema_utils from ludwig.utils.registry import Registry @@ -15,7 +15,7 @@ ecd_output_config_registry = output_config_registries[MODEL_ECD] gbm_output_config_registry = output_config_registries[MODEL_GBM] llm_output_config_registry = output_config_registries[MODEL_LLM] -reward_model_output_config_registry = output_config_registries[MODEL_REWARD] +rwd_output_config_registry = output_config_registries[MODEL_RWD] input_mixin_registry = Registry() output_mixin_registry = Registry() diff --git a/ludwig/schema/model_types/ecd.py b/ludwig/schema/model_types/ecd.py index 76ca6fdea3a..b39da66a616 100644 --- a/ludwig/schema/model_types/ecd.py +++ b/ludwig/schema/model_types/ecd.py @@ -39,7 +39,9 @@ class ECDModelConfig(ModelConfig): @DeveloperAPI -@register_model_type(name="reward_model") +@register_model_type(name="rwd") @ludwig_dataclass -class RewardModelConfig(ECDModelConfig): - model_type: str = schema_utils.ProtectedString("reward_model") +class RWDModelConfig(ECDModelConfig): + """Parameters for RWD (Reward Model).""" + + model_type: str = schema_utils.ProtectedString("rwd") diff --git a/ludwig/schema/trainer.py b/ludwig/schema/trainer.py index e3f637480e9..21111bb0d50 100644 --- a/ludwig/schema/trainer.py +++ b/ludwig/schema/trainer.py @@ -12,6 +12,7 @@ MODEL_ECD, MODEL_GBM, MODEL_LLM, + MODEL_RWD, TRAINING, ) from ludwig.error import ConfigValidationError @@ -812,17 +813,14 @@ class FineTuneTrainerConfig(ECDTrainerConfig): @DeveloperAPI -@register_trainer_schema("reward_model") +@register_trainer_schema(MODEL_RWD) @ludwig_dataclass -class RewardModelTrainerConfig(ECDTrainerConfig): - """Dataclass that configures most of the hyperparameters used for RLHF reward model training.""" - - # Required for lookup during trainer initialization - type: str = schema_utils.ProtectedString("reward_model") +class RWDTrainerConfig(ECDTrainerConfig): + """Dataclass that configures most of the hyperparameters used for Reward Model training.""" base_learning_rate: float = schema_utils.NonNegativeFloat( default=0.0, - description="Base learning rate used for training in the RLHF reward model trainer.", + description="Base learning rate used for training in the Reward Model trainer.", ) diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index 2d0ecdbb630..26a054997e9 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -3,10 +3,11 @@ import torch +from ludwig.constants import MODEL_RWD from ludwig.distributed.base import DistributedStrategy -from ludwig.models.llm import LLM +from ludwig.models.ecd import RWD from ludwig.modules.loss_modules import RewardLoss -from ludwig.schema.trainer import RewardModelTrainerConfig +from ludwig.schema.trainer import RWDTrainerConfig from ludwig.trainers.registry import register_trainer from ludwig.trainers.trainer import Trainer from ludwig.utils.defaults import default_random_seed @@ -14,16 +15,16 @@ logger = logging.getLogger(__name__) -@register_trainer("reward_model") +@register_trainer(MODEL_RWD) class RewardModelTrainer(Trainer): @staticmethod def get_schema_cls(): - return RewardModelTrainerConfig + return RWDTrainerConfig def __init__( self, - config: RewardModelTrainerConfig, - model: LLM, + config: RWDTrainerConfig, + model: RWD, resume: float = False, skip_save_model: bool = False, skip_save_progress: bool = False, From e3117413fccf001dce014d6e675ae134653fe5bf Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 19:07:29 -0700 Subject: [PATCH 23/30] Style edits --- ludwig/models/ecd.py | 9 +++++---- ludwig/schema/decoders/base.py | 11 ++++++++++- ludwig/schema/features/number_feature.py | 2 +- ludwig/trainers/trainer_rlhf.py | 8 ++++---- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/ludwig/models/ecd.py b/ludwig/models/ecd.py index 14bc82ed988..46b84b47f9e 100644 --- a/ludwig/models/ecd.py +++ b/ludwig/models/ecd.py @@ -180,10 +180,11 @@ def get_augmentation_pipelines(self) -> AugmentationPipelines: class RWD(ECD): - """ - This class represents a Reward Model, a model that inputs some feature (i.e. text transcript) and predicts - a single scalar output representing the reward/preference of that input. This model type is used for applications - such as RLHF fine-tuning of LLMs. This model class is a subclass of ECD, and uses most of ECD's code and pathways. + """This class represents a Reward Model, a model type that takes as input some feature (i.e. text) and predicts + a single scalar output representing the reward/preference of that input. + + This model type is used for applications such as RLHF fine-tuning of LLMs. This model class is a subclass of ECD, + and uses most of ECD's code and pathways. """ @staticmethod def type() -> str: diff --git a/ludwig/schema/decoders/base.py b/ludwig/schema/decoders/base.py index dbad3d09787..7de89e4abfb 100644 --- a/ludwig/schema/decoders/base.py +++ b/ludwig/schema/decoders/base.py @@ -3,7 +3,16 @@ from ludwig.api_annotations import DeveloperAPI from ludwig.constants import ( - BINARY, CATEGORY, MODEL_ECD, MODEL_GBM, MODEL_LLM, MODEL_RWD, NUMBER, SET, TIMESERIES, VECTOR) + BINARY, + CATEGORY, + MODEL_ECD, + MODEL_GBM, + MODEL_LLM, + MODEL_RWD, + NUMBER, + SET, + TIMESERIES, + VECTOR) from ludwig.schema import common_fields from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.utils import register_decoder_config diff --git a/ludwig/schema/features/number_feature.py b/ludwig/schema/features/number_feature.py index 387d6cbf57b..20dfa84e8bd 100644 --- a/ludwig/schema/features/number_feature.py +++ b/ludwig/schema/features/number_feature.py @@ -16,12 +16,12 @@ ecd_defaults_config_registry, ecd_input_config_registry, ecd_output_config_registry, - rwd_output_config_registry, gbm_defaults_config_registry, gbm_input_config_registry, gbm_output_config_registry, input_mixin_registry, output_mixin_registry, + rwd_output_config_registry, ) from ludwig.schema.metadata import FEATURE_METADATA from ludwig.schema.metadata.parameter_metadata import INTERNAL_ONLY diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index 26a054997e9..f4050644019 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -16,7 +16,9 @@ @register_trainer(MODEL_RWD) -class RewardModelTrainer(Trainer): +class RWDTrainer(Trainer): + """This class trains models of type Reward Model.""" + @staticmethod def get_schema_cls(): return RWDTrainerConfig @@ -77,9 +79,7 @@ def train_step( # Validate inputs and targets if not len(inputs) == 1: - raise ValueError( - f"Invalid reward model training data inputs, expect 1 input feature, got {len(inputs)}." - ) + raise ValueError(f"Invalid reward model training data inputs, expect 1 input feature, got {len(inputs)}.") if not len(targets) == 1: raise ValueError( f"Invalid reward model training data targets, expect 1 target feature, got {len(targets)}." From 1662827352e21b4b89d865b0c25f5ca0421088ba Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 19:13:43 -0700 Subject: [PATCH 24/30] Add text encoder --- ludwig/models/ecd.py | 1 + ludwig/schema/decoders/base.py | 3 ++- ludwig/schema/features/text_feature.py | 13 +++++++++++++ ludwig/schema/features/utils.py | 1 + 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/ludwig/models/ecd.py b/ludwig/models/ecd.py index 46b84b47f9e..fb12411d625 100644 --- a/ludwig/models/ecd.py +++ b/ludwig/models/ecd.py @@ -186,6 +186,7 @@ class RWD(ECD): This model type is used for applications such as RLHF fine-tuning of LLMs. This model class is a subclass of ECD, and uses most of ECD's code and pathways. """ + @staticmethod def type() -> str: return MODEL_RWD diff --git a/ludwig/schema/decoders/base.py b/ludwig/schema/decoders/base.py index 7de89e4abfb..e7381611c58 100644 --- a/ludwig/schema/decoders/base.py +++ b/ludwig/schema/decoders/base.py @@ -12,7 +12,8 @@ NUMBER, SET, TIMESERIES, - VECTOR) + VECTOR +) from ludwig.schema import common_fields from ludwig.schema import utils as schema_utils from ludwig.schema.decoders.utils import register_decoder_config diff --git a/ludwig/schema/features/text_feature.py b/ludwig/schema/features/text_feature.py index be87778bbee..f9be1c653ae 100644 --- a/ludwig/schema/features/text_feature.py +++ b/ludwig/schema/features/text_feature.py @@ -4,6 +4,7 @@ MODEL_ECD, MODEL_GBM, MODEL_LLM, + MODEL_RWD, NEXT_TOKEN_SOFTMAX_CROSS_ENTROPY, PERPLEXITY, SEQUENCE_SOFTMAX_CROSS_ENTROPY, @@ -30,6 +31,7 @@ llm_input_config_registry, llm_output_config_registry, output_mixin_registry, + rwd_input_config_registry, ) from ludwig.schema.metadata import FEATURE_METADATA from ludwig.schema.metadata.parameter_metadata import INTERNAL_ONLY @@ -91,6 +93,17 @@ class LLMTextInputFeatureConfig(TextInputFeatureConfig): ) +@DeveloperAPI +@rwd_input_config_registry.register(TEXT) +@ludwig_dataclass +class RWDTextInputFeatureConfig(TextInputFeatureConfig): + encoder: BaseEncoderConfig = EncoderDataclassField( + MODEL_RWD, + feature_type=TEXT, + default="parallel_cnn", + ) + + @DeveloperAPI @output_mixin_registry.register(TEXT) @ludwig_dataclass diff --git a/ludwig/schema/features/utils.py b/ludwig/schema/features/utils.py index 97da367bb5d..8c4914f691e 100644 --- a/ludwig/schema/features/utils.py +++ b/ludwig/schema/features/utils.py @@ -11,6 +11,7 @@ ecd_input_config_registry = input_config_registries[MODEL_ECD] gbm_input_config_registry = input_config_registries[MODEL_GBM] llm_input_config_registry = input_config_registries[MODEL_LLM] +rwd_input_config_registry = input_config_registries[MODEL_RWD] ecd_output_config_registry = output_config_registries[MODEL_ECD] gbm_output_config_registry = output_config_registries[MODEL_GBM] From 4860265439c418376c57217ed20f623f82a8faf1 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 19:37:24 -0700 Subject: [PATCH 25/30] Small edits --- ludwig/schema/decoders/base.py | 2 +- ludwig/schema/encoders/text_encoders.py | 4 ++-- ludwig/schema/features/text_feature.py | 2 +- tests/integration_tests/test_trainer.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ludwig/schema/decoders/base.py b/ludwig/schema/decoders/base.py index e7381611c58..d142cb0a908 100644 --- a/ludwig/schema/decoders/base.py +++ b/ludwig/schema/decoders/base.py @@ -12,7 +12,7 @@ NUMBER, SET, TIMESERIES, - VECTOR + VECTOR, ) from ludwig.schema import common_fields from ludwig.schema import utils as schema_utils diff --git a/ludwig/schema/encoders/text_encoders.py b/ludwig/schema/encoders/text_encoders.py index acfbad5469f..5373519e8b7 100644 --- a/ludwig/schema/encoders/text_encoders.py +++ b/ludwig/schema/encoders/text_encoders.py @@ -1,7 +1,7 @@ from typing import Callable, Dict, List, Optional, TYPE_CHECKING, Union from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import MODEL_ECD, MODEL_GBM, TEXT +from ludwig.constants import MODEL_ECD, MODEL_GBM, MODEL_RWD, TEXT from ludwig.error import ConfigValidationError from ludwig.schema import utils as schema_utils from ludwig.schema.encoders.sequence_encoders import SequenceEncoderConfig @@ -599,7 +599,7 @@ def module_name(): @DeveloperAPI -@register_encoder_config("bert", TEXT) +@register_encoder_config("bert", TEXT, model_types=[MODEL_RWD]) @ludwig_dataclass class BERTConfig(HFEncoderConfig): """This dataclass configures the schema used for an BERT encoder.""" diff --git a/ludwig/schema/features/text_feature.py b/ludwig/schema/features/text_feature.py index f9be1c653ae..dfd1f08d363 100644 --- a/ludwig/schema/features/text_feature.py +++ b/ludwig/schema/features/text_feature.py @@ -100,7 +100,7 @@ class RWDTextInputFeatureConfig(TextInputFeatureConfig): encoder: BaseEncoderConfig = EncoderDataclassField( MODEL_RWD, feature_type=TEXT, - default="parallel_cnn", + default="bert", ) diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index a0dfbe8fe67..d71a8c8cb6d 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -252,7 +252,7 @@ def test_rlhf_reward_model_trainer(tmpdir): "transcript_column": transcript_column, } } - config["model_type"] = "reward_model" + config["model_type"] = "rwd" # Train Ludwig model with the dataset ludwig_model = LudwigModel(config, backend=backend) From 9cf18d618960e7e4e36777001d3a59c274dca16b Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Wed, 14 Jun 2023 21:04:47 -0700 Subject: [PATCH 26/30] Modify trainer, tests passing --- ludwig/trainers/trainer_rlhf.py | 39 +++++++++++++++++-- tests/integration_tests/test_preprocessing.py | 2 +- tests/integration_tests/test_trainer.py | 5 +++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index f4050644019..e286cae9e6f 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -10,6 +10,7 @@ from ludwig.schema.trainer import RWDTrainerConfig from ludwig.trainers.registry import register_trainer from ludwig.trainers.trainer import Trainer +from ludwig.utils.batch_size_tuner import BatchSizeEvaluator from ludwig.utils.defaults import default_random_seed logger = logging.getLogger(__name__) @@ -56,6 +57,30 @@ def __init__( # Save the reward model loss function self.reward_loss_function = RewardLoss({}) + def _create_batch_size_evaluator(self) -> BatchSizeEvaluator: + trainer = self + + class _TrainerBatchSizeEvaluator(BatchSizeEvaluator): + def reset(self): + trainer.model.reset_metrics() + trainer.optimizer.zero_grad() + + def step(self, batch_size: int): + trainer.distributed.set_batch_size(trainer.dist_model, batch_size) + inputs = { + input_feature_name: [ + input_feature.create_sample_input(batch_size=batch_size).to(trainer.device), + input_feature.create_sample_input(batch_size=batch_size).to(trainer.device), + ] for input_feature_name, input_feature in trainer.model.input_features.items() + } + targets = { + output_feature_name: output_feature.create_sample_output(batch_size=batch_size).to(trainer.device) + for output_feature_name, output_feature in trainer.model.output_features.items() + } + trainer.train_step(inputs, targets) + + return _TrainerBatchSizeEvaluator() + def train_step( self, inputs: Dict[str, torch.Tensor], targets: Dict[str, torch.Tensor], should_step: bool = True ) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]: @@ -84,15 +109,21 @@ def train_step( raise ValueError( f"Invalid reward model training data targets, expect 1 target feature, got {len(targets)}." ) + id_column = list(targets.keys())[0] + transcript_column = list(inputs.keys())[0] # Run forward-propagation of the chosen and rejected inputs with self.distributed.prepare_model_update(self.dist_model, should_step=should_step): # Obtain model predictions and loss - model_output_chosen = self.dist_model(inputs[self.transcript_column][0]) - model_output_rejected = self.dist_model(inputs[self.transcript_column][1]) - loss = self.reward_loss_function(model_output_chosen, model_output_rejected) + inputs_chosen = {transcript_column: inputs[transcript_column][0]} + inputs_rejected = {transcript_column: inputs[transcript_column][1]} + model_output_chosen = self.dist_model(inputs_chosen) + model_output_rejected = self.dist_model(inputs_rejected) + logits_chosen = model_output_chosen[f"{id_column}::logits"] + logits_rejected = model_output_rejected[f"{id_column}::logits"] + loss = self.reward_loss_function(logits_chosen, logits_rejected) loss = loss / self.gradient_accumulation_steps - all_losses = loss + all_losses = {"reward_loss": loss} # Begin the backward pass variables = self.dist_model.parameters() diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 89e7a313186..fadbd159174 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -198,7 +198,7 @@ def test_rlhf_reward_model_data_preprocessor(): "transcript_column": transcript_column, } } - config[TRAINER] = {"type": "reward_model"} + config["model_type"] = "rwd" # Run preprocessing, get output dataset ludwig_model = LudwigModel(config, backend=backend) diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index d71a8c8cb6d..ef62e804f75 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -252,6 +252,11 @@ def test_rlhf_reward_model_trainer(tmpdir): "transcript_column": transcript_column, } } + config[TRAINER] = { + "epochs": 2, + BATCH_SIZE: 4, + "learning_rate": 1.0, + } config["model_type"] = "rwd" # Train Ludwig model with the dataset From 82bbffe2664bbe992103224e71dabb7f10c4a528 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Tue, 20 Jun 2023 07:25:51 -0700 Subject: [PATCH 27/30] Bug fix --- ludwig/data/preprocessing.py | 4 ++-- ludwig/trainers/trainer_rlhf.py | 10 +++------- tests/integration_tests/test_trainer.py | 2 +- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index b4b53161831..7bde94e5898 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1323,7 +1323,7 @@ def build_dataset( proc_cols[proc_column] = backend.df_engine.map_objects(proc_cols[proc_column], lambda x: x.reshape(-1)) # If training a reward model, prepare the processed columns - if mode == "training" and "reward_dataset" in global_preprocessing_parameters: + if mode == "training" and global_preprocessing_parameters["reward_dataset"] is not None: reward_parameter_names = [ "id_column", "outcome_column", @@ -1418,7 +1418,7 @@ def build_dataset( dataset = embed_fixed_features(dataset, feature_configs, metadata, backend) # If training a reward model, perform grouping and joining on dataset - if mode == "training" and "reward_dataset" in global_preprocessing_parameters: + if mode == "training" and global_preprocessing_parameters["reward_dataset"] is not None: def parse_id_rows_group(rows_group): rows_idxs = rows_group.index diff --git a/ludwig/trainers/trainer_rlhf.py b/ludwig/trainers/trainer_rlhf.py index e286cae9e6f..011b89b5f66 100644 --- a/ludwig/trainers/trainer_rlhf.py +++ b/ludwig/trainers/trainer_rlhf.py @@ -71,7 +71,8 @@ def step(self, batch_size: int): input_feature_name: [ input_feature.create_sample_input(batch_size=batch_size).to(trainer.device), input_feature.create_sample_input(batch_size=batch_size).to(trainer.device), - ] for input_feature_name, input_feature in trainer.model.input_features.items() + ] + for input_feature_name, input_feature in trainer.model.input_features.items() } targets = { output_feature_name: output_feature.create_sample_output(batch_size=batch_size).to(trainer.device) @@ -94,12 +95,7 @@ def train_step( Returns: A tuple of the loss tensor and a dictionary of loss for every output feature. """ - if not all( - [ - self.use_amp is False, - self.evaluate_training_set is True, - ] - ): + if self.use_amp is True: raise ValueError("Invalid trainer arguments for RLHF reward model") # Validate inputs and targets diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index ef62e804f75..80a3cb8d5e4 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -228,7 +228,7 @@ def test_rlhf_reward_model_trainer(tmpdir): input_features = [ text_feature( name=transcript_column, - encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "bert-base-uncased"}, + encoder={"type": "auto_transformer", "pretrained_model_name_or_path": "gpt2", "trainable": True}, ) ] output_features = [number_feature(name=id_column)] From 3675f42aa5380dc18cac1027a68a729339577176 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Tue, 20 Jun 2023 07:53:35 -0700 Subject: [PATCH 28/30] Small edit --- ludwig/schema/encoders/text_encoders.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ludwig/schema/encoders/text_encoders.py b/ludwig/schema/encoders/text_encoders.py index 5373519e8b7..51d09933983 100644 --- a/ludwig/schema/encoders/text_encoders.py +++ b/ludwig/schema/encoders/text_encoders.py @@ -599,7 +599,7 @@ def module_name(): @DeveloperAPI -@register_encoder_config("bert", TEXT, model_types=[MODEL_RWD]) +@register_encoder_config("bert", TEXT) @ludwig_dataclass class BERTConfig(HFEncoderConfig): """This dataclass configures the schema used for an BERT encoder.""" @@ -3144,7 +3144,7 @@ def module_name(): @DeveloperAPI -@register_encoder_config("tf_idf", TEXT, model_types=[MODEL_ECD, MODEL_GBM]) +@register_encoder_config("tf_idf", TEXT, model_types=[MODEL_ECD, MODEL_GBM, MODEL_RWD]) @ludwig_dataclass class TfIdfEncoderConfig(SequenceEncoderConfig): type: str = schema_utils.ProtectedString("tf_idf") From 4f852ea27cac7432beb2044efb7de46171556916 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Tue, 20 Jun 2023 07:56:10 -0700 Subject: [PATCH 29/30] Another edit --- ludwig/schema/features/text_feature.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ludwig/schema/features/text_feature.py b/ludwig/schema/features/text_feature.py index dfd1f08d363..a7f23030a07 100644 --- a/ludwig/schema/features/text_feature.py +++ b/ludwig/schema/features/text_feature.py @@ -100,7 +100,7 @@ class RWDTextInputFeatureConfig(TextInputFeatureConfig): encoder: BaseEncoderConfig = EncoderDataclassField( MODEL_RWD, feature_type=TEXT, - default="bert", + default="tf_idf", ) From b6ef5d161ac1e2fda6cee00573ba2dc3203c1ce1 Mon Sep 17 00:00:00 2001 From: Arvind Sridhar <130104093+asdataminer@users.noreply.github.com> Date: Tue, 20 Jun 2023 08:13:56 -0700 Subject: [PATCH 30/30] Reward loss test --- tests/integration_tests/test_trainer.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index 80a3cb8d5e4..b47f2e41be8 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -37,6 +37,7 @@ from ludwig.data.dataset.ray import RayDataset from ludwig.models.gbm import GBM + from ludwig.modules.loss_modules import RewardLoss from ludwig.schema.model_config import ModelConfig from ludwig.schema.trainer import GBMTrainerConfig from ludwig.trainers.trainer_lightgbm import LightGBMRayTrainer @@ -264,6 +265,15 @@ def test_rlhf_reward_model_trainer(tmpdir): ludwig_model.train(training_set=dataframe, output_directory=tmpdir) +def test_rlhf_reward_model_loss(): + reward_loss_function = RewardLoss({}) + + # Test the reward loss function + assert reward_loss_function(torch.tensor(100.0), torch.tensor(50.0)) < torch.tensor(1e-15) + assert reward_loss_function(torch.tensor(50.0), torch.tensor(100.0)) > torch.tensor(10) + assert reward_loss_function(torch.tensor(100.0), torch.tensor(100.0)) > torch.tensor(0.4) + + @pytest.mark.distributed def test_lightgbm_dataset_partition(ray_cluster_2cpu): # Create a LightGBM model with a Ray backend