From ac93ac6dbf3142b31509767f7814fcad8d1ede3c Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Tue, 28 Jun 2022 14:38:56 +0200 Subject: [PATCH 1/9] wip --- ludwig/features/sequence_feature.py | 72 +++++++++++++-------- tests/integration_tests/test_torchscript.py | 31 +++++++++ 2 files changed, 77 insertions(+), 26 deletions(-) diff --git a/ludwig/features/sequence_feature.py b/ludwig/features/sequence_feature.py index 2f0d905b53d..1bbe71d429e 100644 --- a/ludwig/features/sequence_feature.py +++ b/ludwig/features/sequence_feature.py @@ -64,6 +64,39 @@ logger = logging.getLogger(__name__) +def process_sequence( + sequence_matrix: torch.Tensor, + sample_idx: int, + sequence: str, + lowercase: bool, + tokenizer: Any, + max_sequence_length: int, + unit_to_id: Dict[str, int], + unknown_symbol: str, + stop_symbol: str, +): + if lowercase: + sequence_str: str = sequence.lower() + else: + sequence_str: str = sequence + + unit_sequence: List[str] = tokenizer(sequence_str) + # Add if sequence length is less than max_sequence_length. Else, truncate to max_sequence_length. + if len(unit_sequence) + 1 < max_sequence_length: + sequence_length = len(unit_sequence) + sequence_matrix[sample_idx][len(unit_sequence) + 1] = unit_to_id[stop_symbol] + else: + sequence_length = max_sequence_length - 1 + + for i in range(sequence_length): + curr_unit = unit_sequence[i] + if curr_unit in unit_to_id: + curr_id = unit_to_id[curr_unit] + else: + curr_id = unit_to_id[unknown_symbol] + sequence_matrix[sample_idx][i + 1] = curr_id + + class _SequencePreprocessing(torch.nn.Module): """Torchscript-enabled version of preprocessing done by SequenceFeatureMixin.add_feature_data.""" @@ -94,34 +127,21 @@ def forward(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: v = [self.computed_fill_value if s == "nan" else s for s in v] - if self.lowercase: - sequences = [sequence.lower() for sequence in v] - else: - sequences = v - - unit_sequences = self.tokenizer(sequences) - # refines type of unit_sequences from Any to List[List[str]] - assert torch.jit.isinstance(unit_sequences, List[List[str]]), "unit_sequences is not a list of lists." - - sequence_matrix = torch.full( - [len(unit_sequences), self.max_sequence_length], self.unit_to_id[self.padding_symbol] - ) + sequence_matrix = torch.full([len(v), self.max_sequence_length], self.unit_to_id[self.padding_symbol]) sequence_matrix[:, 0] = self.unit_to_id[self.start_symbol] - for sample_idx, unit_sequence in enumerate(unit_sequences): - # Add if sequence length is less than max_sequence_length. Else, truncate to max_sequence_length. - if len(unit_sequence) + 1 < self.max_sequence_length: - sequence_length = len(unit_sequence) - sequence_matrix[sample_idx][len(unit_sequence) + 1] = self.unit_to_id[self.stop_symbol] - else: - sequence_length = self.max_sequence_length - 1 - for i in range(sequence_length): - curr_unit = unit_sequence[i] - if curr_unit in self.unit_to_id: - curr_id = self.unit_to_id[curr_unit] - else: - curr_id = self.unit_to_id[self.unknown_symbol] - sequence_matrix[sample_idx][i + 1] = curr_id + for sample_idx, sequence in enumerate(v): + process_sequence( + sequence_matrix, + sample_idx, + sequence, + self.lowercase, + self.tokenizer, + self.max_sequence_length, + self.unit_to_id, + self.unknown_symbol, + self.stop_symbol, + ) return sequence_matrix diff --git a/tests/integration_tests/test_torchscript.py b/tests/integration_tests/test_torchscript.py index 7165af1ea73..d32487fcff4 100644 --- a/tests/integration_tests/test_torchscript.py +++ b/tests/integration_tests/test_torchscript.py @@ -592,6 +592,37 @@ def test_torchscript_postproc_gpu(tmpdir, csv_filename, feature_fn): assert utils.is_all_tensors_cuda(output_values), f"{feature_name}.{output_name} tensors are not on GPU" +def test_preproc_speed(tmpdir, csv_filename): + data_csv_path = os.path.join(tmpdir, csv_filename) + + feature = text_feature() + input_features = [ + feature, + ] + output_features = [ + binary_feature(), + ] + + config = {"input_features": input_features, "output_features": output_features, TRAINER: {"epochs": 2}} + backend = LocalTestBackend() + training_data_csv_path = generate_data(input_features, output_features, data_csv_path) + + ludwig_model, _ = initialize_torchscript_module(tmpdir, config, backend, training_data_csv_path) + + df = pd.read_csv(training_data_csv_path) + inputs = to_inference_module_input_from_dataframe( + df, + config, + load_paths=True, + ) + + print(ludwig_model.input_features) + + # sequence_preprocessor = _SequencePreprocessing() + + # inputs[feature[NAME]] + + def validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path, tolerance=1e-8): # Train Ludwig (Pythonic) model: ludwig_model, script_module = initialize_torchscript_module( From 44e9fd3aa8261bee3c954454605a5eb995e8c823 Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Tue, 28 Jun 2022 17:21:38 +0200 Subject: [PATCH 2/9] add test sequence feature throughput --- ludwig/features/sequence_feature.py | 121 +++++++++++------- tests/integration_tests/test_torchscript.py | 2 - .../test_sequence_features_throughput.py | 109 ++++++++++++++++ 3 files changed, 183 insertions(+), 49 deletions(-) create mode 100644 tests/ludwig/features/test_sequence_features_throughput.py diff --git a/ludwig/features/sequence_feature.py b/ludwig/features/sequence_feature.py index 1bbe71d429e..55dd6168be7 100644 --- a/ludwig/features/sequence_feature.py +++ b/ludwig/features/sequence_feature.py @@ -64,39 +64,6 @@ logger = logging.getLogger(__name__) -def process_sequence( - sequence_matrix: torch.Tensor, - sample_idx: int, - sequence: str, - lowercase: bool, - tokenizer: Any, - max_sequence_length: int, - unit_to_id: Dict[str, int], - unknown_symbol: str, - stop_symbol: str, -): - if lowercase: - sequence_str: str = sequence.lower() - else: - sequence_str: str = sequence - - unit_sequence: List[str] = tokenizer(sequence_str) - # Add if sequence length is less than max_sequence_length. Else, truncate to max_sequence_length. - if len(unit_sequence) + 1 < max_sequence_length: - sequence_length = len(unit_sequence) - sequence_matrix[sample_idx][len(unit_sequence) + 1] = unit_to_id[stop_symbol] - else: - sequence_length = max_sequence_length - 1 - - for i in range(sequence_length): - curr_unit = unit_sequence[i] - if curr_unit in unit_to_id: - curr_id = unit_to_id[curr_unit] - else: - curr_id = unit_to_id[unknown_symbol] - sequence_matrix[sample_idx][i + 1] = curr_id - - class _SequencePreprocessing(torch.nn.Module): """Torchscript-enabled version of preprocessing done by SequenceFeatureMixin.add_feature_data.""" @@ -120,31 +87,91 @@ def __init__(self, metadata: Dict[str, Any]): self.unit_to_id = metadata["str2idx"] self.computed_fill_value = metadata["preprocessing"]["computed_fill_value"] - def forward(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: + def _process_sequence(self, sequence: str) -> torch.Tensor: + if self.lowercase: + sequence_str: str = sequence.lower() + else: + sequence_str: str = sequence + + unit_sequence = self.tokenizer(sequence_str) + assert torch.jit.isinstance(unit_sequence, List[str]) + + sequence_vector = torch.full([self.max_sequence_length], self.unit_to_id[self.padding_symbol]) + sequence_vector[0] = self.unit_to_id[self.start_symbol] + if len(unit_sequence) + 1 < self.max_sequence_length: + sequence_length = len(unit_sequence) + sequence_vector[len(unit_sequence) + 1] = self.unit_to_id[self.stop_symbol] + else: + sequence_length = self.max_sequence_length - 1 + + for i in range(sequence_length): + curr_unit = unit_sequence[i] + if curr_unit in self.unit_to_id: + curr_id = self.unit_to_id[curr_unit] + else: + curr_id = self.unit_to_id[self.unknown_symbol] + sequence_vector[i + 1] = curr_id + return sequence_vector + + @torch.jit.export + def forward_old(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: """Takes a list of strings and returns a tensor of token ids.""" if not torch.jit.isinstance(v, List[str]): raise ValueError(f"Unsupported input: {v}") v = [self.computed_fill_value if s == "nan" else s for s in v] - sequence_matrix = torch.full([len(v), self.max_sequence_length], self.unit_to_id[self.padding_symbol]) + if self.lowercase: + sequences = [sequence.lower() for sequence in v] + else: + sequences = v + + unit_sequences = self.tokenizer(sequences) + # refines type of unit_sequences from Any to List[List[str]] + assert torch.jit.isinstance(unit_sequences, List[List[str]]), "unit_sequences is not a list of lists." + + sequence_matrix = torch.full( + [len(unit_sequences), self.max_sequence_length], self.unit_to_id[self.padding_symbol] + ) sequence_matrix[:, 0] = self.unit_to_id[self.start_symbol] + for sample_idx, unit_sequence in enumerate(unit_sequences): + # Add if sequence length is less than max_sequence_length. Else, truncate to max_sequence_length. + if len(unit_sequence) + 1 < self.max_sequence_length: + sequence_length = len(unit_sequence) + sequence_matrix[sample_idx][len(unit_sequence) + 1] = self.unit_to_id[self.stop_symbol] + else: + sequence_length = self.max_sequence_length - 1 - for sample_idx, sequence in enumerate(v): - process_sequence( - sequence_matrix, - sample_idx, - sequence, - self.lowercase, - self.tokenizer, - self.max_sequence_length, - self.unit_to_id, - self.unknown_symbol, - self.stop_symbol, - ) + for i in range(sequence_length): + curr_unit = unit_sequence[i] + if curr_unit in self.unit_to_id: + curr_id = self.unit_to_id[curr_unit] + else: + curr_id = self.unit_to_id[self.unknown_symbol] + sequence_matrix[sample_idx][i + 1] = curr_id return sequence_matrix + def forward(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: + """Takes a list of strings and returns a tensor of token ids.""" + if not torch.jit.isinstance(v, List[str]): + raise ValueError(f"Unsupported input: {v}") + + v = [self.computed_fill_value if s == "nan" else s for s in v] + + sequence_matrix = [] + for sequence in v: + sequence_vector = self._process_sequence(sequence) + sequence_matrix.append(sequence_vector) + return torch.stack(sequence_matrix) + + @torch.jit.unused + def forward_series(self, column, backend) -> torch.Tensor: + column = backend.df_engine.map_objects(column, lambda s: self.computed_fill_value if s == "nan" else s) + column = backend.df_engine.map_objects(column, self._process_sequence) + sequence_matrix = backend.df_engine.compute(column).values.tolist() + return torch.stack(sequence_matrix) + class _SequencePostprocessing(torch.nn.Module): def __init__(self, metadata: Dict[str, Any]): diff --git a/tests/integration_tests/test_torchscript.py b/tests/integration_tests/test_torchscript.py index d32487fcff4..83653ffd34e 100644 --- a/tests/integration_tests/test_torchscript.py +++ b/tests/integration_tests/test_torchscript.py @@ -616,8 +616,6 @@ def test_preproc_speed(tmpdir, csv_filename): load_paths=True, ) - print(ludwig_model.input_features) - # sequence_preprocessor = _SequencePreprocessing() # inputs[feature[NAME]] diff --git a/tests/ludwig/features/test_sequence_features_throughput.py b/tests/ludwig/features/test_sequence_features_throughput.py new file mode 100644 index 00000000000..2c6385a7760 --- /dev/null +++ b/tests/ludwig/features/test_sequence_features_throughput.py @@ -0,0 +1,109 @@ +import collections +import random +import time +import os + +import numpy as np +import pandas as pd +import torch +from ludwig.api import LudwigModel +from ludwig.data.preprocessing import preprocess_for_prediction + +from ludwig.constants import TRAINER, NAME, TYPE +from ludwig.features.feature_registries import input_type_registry +from ludwig.models.inference import to_inference_module_input_from_dataframe +from ludwig.utils.misc_utils import get_from_registry +from tests.integration_tests.utils import ( + RAY_BACKEND_CONFIG, + binary_feature, + generate_data, + init_backend, + text_feature, +) + +SEQ_SIZE = 6 + + +def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): + feature_config = text_feature() + input_features = [feature_config] + output_features = [binary_feature()] + config = {"input_features": input_features, "output_features": output_features, TRAINER: {"epochs": 1}} + data_csv_path = generate_data(input_features, output_features, os.path.join(tmpdir, "data.csv"), num_examples=100) + + backend = "ray" + with init_backend(backend): + if backend == "ray": + backend = RAY_BACKEND_CONFIG + backend["processor"]["type"] = "dask" + + ludwig_model = LudwigModel(config, backend=backend) + _, _, output_directory = ludwig_model.train( + dataset=data_csv_path, + output_directory=os.path.join(tmpdir, "output"), + ) + + feature = get_from_registry(feature_config[TYPE], input_type_registry) + preprocessing_module = feature.create_preproc_module(ludwig_model.training_set_metadata[feature_config[NAME]]) + scripted_preprocessing_module = torch.jit.script(preprocessing_module) + + scripted_model = ludwig_model.to_torchscript() + scripted_model_path = os.path.join(tmpdir, "inference_module.pt") + torch.jit.save(scripted_model, scripted_model_path) + scripted_model = torch.jit.load(scripted_model_path) + + df = pd.read_csv(data_csv_path) + batch_sizes = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] + + print("warming up...") + for i in range(100): + batch_size = random.choice(batch_sizes) + inputs_df = df.sample(n=batch_size, replace=True) + inputs_series = ludwig_model.backend.df_engine.from_pandas(inputs_df)[feature_config[NAME]] + inputs_dict = to_inference_module_input_from_dataframe(inputs_df, config) + + scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) + scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) + scripted_model.preprocessor_forward(inputs_dict) + + print("benchmarking...") + for batch_size in batch_sizes: + inputs_df = df.sample(n=batch_size, replace=True) + inputs_series = ludwig_model.backend.df_engine.from_pandas(inputs_df)[feature_config[NAME]] + inputs_dict = to_inference_module_input_from_dataframe(inputs_df, config) + + method_to_durations = collections.defaultdict(list) + + for i in range(100): + start_t = time.time() + scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) + method_to_durations["old"].append(time.time() - start_t) + + start_t = time.time() + scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) + method_to_durations["new"].append(time.time() - start_t) + + start_t = time.time() + preprocessing_module.forward_series(inputs_series, ludwig_model.backend) + method_to_durations["series"].append(time.time() - start_t) + + start_t = time.time() + preprocess_for_prediction( + ludwig_model.config, + dataset=inputs_df, + training_set_metadata=ludwig_model.training_set_metadata, + include_outputs=False, + backend=ludwig_model.backend, + ) + method_to_durations["ludwig"].append(time.time() - start_t) + + start_t = time.time() + scripted_model.preprocessor_forward(inputs_dict) + method_to_durations["ts_inf"].append(time.time() - start_t) + + print() + print(f"Batch size: {batch_size}") + for method, durations in method_to_durations.items(): + print(f"\t{method}:\t{np.mean(durations):.8f} +/- {np.std(durations):.8f}") + + print("done.") From 6c8aefeb83d7ea01c7d3192c6c7680b9163ab3fc Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Tue, 28 Jun 2022 18:55:00 +0200 Subject: [PATCH 3/9] added ludwig tests --- ludwig/features/sequence_feature.py | 54 ++++---- .../test_sequence_features_throughput.py | 117 +++++++++++++----- 2 files changed, 113 insertions(+), 58 deletions(-) diff --git a/ludwig/features/sequence_feature.py b/ludwig/features/sequence_feature.py index 55dd6168be7..0fbc4db074f 100644 --- a/ludwig/features/sequence_feature.py +++ b/ludwig/features/sequence_feature.py @@ -87,32 +87,6 @@ def __init__(self, metadata: Dict[str, Any]): self.unit_to_id = metadata["str2idx"] self.computed_fill_value = metadata["preprocessing"]["computed_fill_value"] - def _process_sequence(self, sequence: str) -> torch.Tensor: - if self.lowercase: - sequence_str: str = sequence.lower() - else: - sequence_str: str = sequence - - unit_sequence = self.tokenizer(sequence_str) - assert torch.jit.isinstance(unit_sequence, List[str]) - - sequence_vector = torch.full([self.max_sequence_length], self.unit_to_id[self.padding_symbol]) - sequence_vector[0] = self.unit_to_id[self.start_symbol] - if len(unit_sequence) + 1 < self.max_sequence_length: - sequence_length = len(unit_sequence) - sequence_vector[len(unit_sequence) + 1] = self.unit_to_id[self.stop_symbol] - else: - sequence_length = self.max_sequence_length - 1 - - for i in range(sequence_length): - curr_unit = unit_sequence[i] - if curr_unit in self.unit_to_id: - curr_id = self.unit_to_id[curr_unit] - else: - curr_id = self.unit_to_id[self.unknown_symbol] - sequence_vector[i + 1] = curr_id - return sequence_vector - @torch.jit.export def forward_old(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: """Takes a list of strings and returns a tensor of token ids.""" @@ -152,7 +126,7 @@ def forward_old(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: return sequence_matrix - def forward(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: + def forward_new(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: """Takes a list of strings and returns a tensor of token ids.""" if not torch.jit.isinstance(v, List[str]): raise ValueError(f"Unsupported input: {v}") @@ -172,6 +146,32 @@ def forward_series(self, column, backend) -> torch.Tensor: sequence_matrix = backend.df_engine.compute(column).values.tolist() return torch.stack(sequence_matrix) + def _process_sequence(self, sequence: str) -> torch.Tensor: + if self.lowercase: + sequence_str: str = sequence.lower() + else: + sequence_str: str = sequence + + unit_sequence = self.tokenizer(sequence_str) + assert torch.jit.isinstance(unit_sequence, List[str]) + + sequence_vector = torch.full([self.max_sequence_length], self.unit_to_id[self.padding_symbol]) + sequence_vector[0] = self.unit_to_id[self.start_symbol] + if len(unit_sequence) + 1 < self.max_sequence_length: + sequence_length = len(unit_sequence) + sequence_vector[len(unit_sequence) + 1] = self.unit_to_id[self.stop_symbol] + else: + sequence_length = self.max_sequence_length - 1 + + for i in range(sequence_length): + curr_unit = unit_sequence[i] + if curr_unit in self.unit_to_id: + curr_id = self.unit_to_id[curr_unit] + else: + curr_id = self.unit_to_id[self.unknown_symbol] + sequence_vector[i + 1] = curr_id + return sequence_vector + class _SequencePostprocessing(torch.nn.Module): def __init__(self, metadata: Dict[str, Any]): diff --git a/tests/ludwig/features/test_sequence_features_throughput.py b/tests/ludwig/features/test_sequence_features_throughput.py index 2c6385a7760..2ef6d194116 100644 --- a/tests/ludwig/features/test_sequence_features_throughput.py +++ b/tests/ludwig/features/test_sequence_features_throughput.py @@ -6,9 +6,11 @@ import numpy as np import pandas as pd import torch +import yaml +from tqdm import tqdm + from ludwig.api import LudwigModel from ludwig.data.preprocessing import preprocess_for_prediction - from ludwig.constants import TRAINER, NAME, TYPE from ludwig.features.feature_registries import input_type_registry from ludwig.models.inference import to_inference_module_input_from_dataframe @@ -21,15 +23,43 @@ text_feature, ) -SEQ_SIZE = 6 +MODE = "AGNEWS" +AGNEWS_CONFIG_STR = """ +input_features: + - name: description + type: text +output_features: + - name: class_index + type: category +trainer: + batch_size: 1024 + train_steps: 1 + steps_per_checkpoint: 1 + early_stop: 0 +backend: + type: ray + processor: + type: dask + trainer: + num_workers: 1 +""" +AGNEWS_CSV_PATH = "/Users/geoffreyangus/.ludwig_cache/agnews_1.0/processed/agnews_tiny.csv" +# AGNEWS_CSV_PATH = "/home/ray/agnews_tiny.csv" def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): - feature_config = text_feature() - input_features = [feature_config] - output_features = [binary_feature()] - config = {"input_features": input_features, "output_features": output_features, TRAINER: {"epochs": 1}} - data_csv_path = generate_data(input_features, output_features, os.path.join(tmpdir, "data.csv"), num_examples=100) + if MODE == "AGNEWS": + config = yaml.safe_load(AGNEWS_CONFIG_STR) + feature_config = config["input_features"][0] + data_csv_path = AGNEWS_CSV_PATH + else: + feature_config = text_feature() + input_features = [feature_config] + output_features = [binary_feature()] + config = {"input_features": input_features, "output_features": output_features, TRAINER: {"epochs": 1}} + data_csv_path = generate_data( + input_features, output_features, os.path.join(tmpdir, "data.csv"), num_examples=100 + ) backend = "ray" with init_backend(backend): @@ -41,7 +71,9 @@ def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): _, _, output_directory = ludwig_model.train( dataset=data_csv_path, output_directory=os.path.join(tmpdir, "output"), + skip_save_processed_input=True, ) + config = ludwig_model.config feature = get_from_registry(feature_config[TYPE], input_type_registry) preprocessing_module = feature.create_preproc_module(ludwig_model.training_set_metadata[feature_config[NAME]]) @@ -52,54 +84,77 @@ def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): torch.jit.save(scripted_model, scripted_model_path) scripted_model = torch.jit.load(scripted_model_path) + model_path = os.path.join(output_directory, "model") + ludwig_model = LudwigModel.load(model_path, backend="local") + df = pd.read_csv(data_csv_path) - batch_sizes = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] + batch_sizes = [ + # 1, + # 2, + # 4, + # 8, + # 16, + # 32, + # 64, + # 128, + # 256, + # 512, + # 1024, + 2048, + ] print("warming up...") - for i in range(100): + for _ in tqdm(range(10)): batch_size = random.choice(batch_sizes) inputs_df = df.sample(n=batch_size, replace=True) - inputs_series = ludwig_model.backend.df_engine.from_pandas(inputs_df)[feature_config[NAME]] - inputs_dict = to_inference_module_input_from_dataframe(inputs_df, config) + inputs_dict = to_inference_module_input_from_dataframe(inputs_df, config) + # scripted_model.preprocessor_forward(inputs_dict) scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) - scripted_model.preprocessor_forward(inputs_dict) print("benchmarking...") for batch_size in batch_sizes: inputs_df = df.sample(n=batch_size, replace=True) - inputs_series = ludwig_model.backend.df_engine.from_pandas(inputs_df)[feature_config[NAME]] inputs_dict = to_inference_module_input_from_dataframe(inputs_df, config) + inputs_series = ludwig_model.backend.df_engine.from_pandas(inputs_df)[feature_config[NAME]] method_to_durations = collections.defaultdict(list) - for i in range(100): + for _ in tqdm(range(10)): start_t = time.time() scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) - method_to_durations["old"].append(time.time() - start_t) + method_to_durations["old_preproc"].append(time.time() - start_t) start_t = time.time() scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) - method_to_durations["new"].append(time.time() - start_t) + method_to_durations["new_preproc"].append(time.time() - start_t) start_t = time.time() preprocessing_module.forward_series(inputs_series, ludwig_model.backend) - method_to_durations["series"].append(time.time() - start_t) - - start_t = time.time() - preprocess_for_prediction( - ludwig_model.config, - dataset=inputs_df, - training_set_metadata=ludwig_model.training_set_metadata, - include_outputs=False, - backend=ludwig_model.backend, - ) - method_to_durations["ludwig"].append(time.time() - start_t) - - start_t = time.time() - scripted_model.preprocessor_forward(inputs_dict) - method_to_durations["ts_inf"].append(time.time() - start_t) + method_to_durations["series_preproc"].append(time.time() - start_t) + + # start_t = time.time() + # preprocess_for_prediction( + # ludwig_model.config, + # dataset=inputs_df, + # training_set_metadata=ludwig_model.training_set_metadata, + # include_outputs=False, + # backend=ludwig_model.backend, + # ) + # method_to_durations["ludwig_preproc"].append(time.time() - start_t) + + # start_t = time.time() + # scripted_model.preprocessor_forward(inputs_dict) + # method_to_durations["ts_inf_preproc"].append(time.time() - start_t) + + # start_t = time.time() + # ludwig_model.predict(inputs_df) + # method_to_durations["ludwig_predict"].append(time.time() - start_t) + + # start_t = time.time() + # scripted_model(inputs_dict) + # method_to_durations["ts_inf_predict"].append(time.time() - start_t) print() print(f"Batch size: {batch_size}") From 80376f93bc8825e4af60fe67453599c599d779f4 Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Tue, 28 Jun 2022 22:55:18 +0200 Subject: [PATCH 4/9] =?UTF-8?q?final=20benchmarking=20commit=E2=80=93=20sh?= =?UTF-8?q?ows=202x=20improvement?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ludwig/features/sequence_feature.py | 20 ++++-- tests/integration_tests/test_torchscript.py | 29 --------- .../test_sequence_features_throughput.py | 61 ++++++++++--------- 3 files changed, 45 insertions(+), 65 deletions(-) diff --git a/ludwig/features/sequence_feature.py b/ludwig/features/sequence_feature.py index 0fbc4db074f..0a0e8ffb566 100644 --- a/ludwig/features/sequence_feature.py +++ b/ludwig/features/sequence_feature.py @@ -126,27 +126,35 @@ def forward_old(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: return sequence_matrix - def forward_new(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: + def forward(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: """Takes a list of strings and returns a tensor of token ids.""" if not torch.jit.isinstance(v, List[str]): raise ValueError(f"Unsupported input: {v}") - v = [self.computed_fill_value if s == "nan" else s for s in v] + futures: List[torch.jit.Future[torch.Tensor]] = [] + for sequence in v: + futures.append( + torch.jit.fork( + self._process_sequence, + sequence, + ) + ) sequence_matrix = [] - for sequence in v: - sequence_vector = self._process_sequence(sequence) - sequence_matrix.append(sequence_vector) + for future in futures: + sequence_matrix.append(torch.jit.wait(future)) + return torch.stack(sequence_matrix) @torch.jit.unused def forward_series(self, column, backend) -> torch.Tensor: - column = backend.df_engine.map_objects(column, lambda s: self.computed_fill_value if s == "nan" else s) column = backend.df_engine.map_objects(column, self._process_sequence) sequence_matrix = backend.df_engine.compute(column).values.tolist() return torch.stack(sequence_matrix) def _process_sequence(self, sequence: str) -> torch.Tensor: + sequence = self.computed_fill_value if sequence == "nan" else sequence + if self.lowercase: sequence_str: str = sequence.lower() else: diff --git a/tests/integration_tests/test_torchscript.py b/tests/integration_tests/test_torchscript.py index 83653ffd34e..7165af1ea73 100644 --- a/tests/integration_tests/test_torchscript.py +++ b/tests/integration_tests/test_torchscript.py @@ -592,35 +592,6 @@ def test_torchscript_postproc_gpu(tmpdir, csv_filename, feature_fn): assert utils.is_all_tensors_cuda(output_values), f"{feature_name}.{output_name} tensors are not on GPU" -def test_preproc_speed(tmpdir, csv_filename): - data_csv_path = os.path.join(tmpdir, csv_filename) - - feature = text_feature() - input_features = [ - feature, - ] - output_features = [ - binary_feature(), - ] - - config = {"input_features": input_features, "output_features": output_features, TRAINER: {"epochs": 2}} - backend = LocalTestBackend() - training_data_csv_path = generate_data(input_features, output_features, data_csv_path) - - ludwig_model, _ = initialize_torchscript_module(tmpdir, config, backend, training_data_csv_path) - - df = pd.read_csv(training_data_csv_path) - inputs = to_inference_module_input_from_dataframe( - df, - config, - load_paths=True, - ) - - # sequence_preprocessor = _SequencePreprocessing() - - # inputs[feature[NAME]] - - def validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path, tolerance=1e-8): # Train Ludwig (Pythonic) model: ludwig_model, script_module = initialize_torchscript_module( diff --git a/tests/ludwig/features/test_sequence_features_throughput.py b/tests/ludwig/features/test_sequence_features_throughput.py index 2ef6d194116..822eab9f5af 100644 --- a/tests/ludwig/features/test_sequence_features_throughput.py +++ b/tests/ludwig/features/test_sequence_features_throughput.py @@ -43,8 +43,8 @@ trainer: num_workers: 1 """ -AGNEWS_CSV_PATH = "/Users/geoffreyangus/.ludwig_cache/agnews_1.0/processed/agnews_tiny.csv" -# AGNEWS_CSV_PATH = "/home/ray/agnews_tiny.csv" +# AGNEWS_CSV_PATH = "/Users/geoffreyangus/.ludwig_cache/agnews_1.0/processed/agnews_tiny.csv" +AGNEWS_CSV_PATH = "/home/ray/agnews_tiny.csv" def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): @@ -89,17 +89,17 @@ def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): df = pd.read_csv(data_csv_path) batch_sizes = [ - # 1, - # 2, - # 4, - # 8, - # 16, - # 32, - # 64, - # 128, - # 256, - # 512, - # 1024, + 1, + 2, + 4, + 8, + 16, + 32, + 64, + 128, + 256, + 512, + 1024, 2048, ] @@ -109,9 +109,10 @@ def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): inputs_df = df.sample(n=batch_size, replace=True) inputs_dict = to_inference_module_input_from_dataframe(inputs_df, config) + scripted_model(inputs_dict) # scripted_model.preprocessor_forward(inputs_dict) - scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) - scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) + # scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) + # scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) print("benchmarking...") for batch_size in batch_sizes: @@ -122,17 +123,17 @@ def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): method_to_durations = collections.defaultdict(list) for _ in tqdm(range(10)): - start_t = time.time() - scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) - method_to_durations["old_preproc"].append(time.time() - start_t) + # start_t = time.time() + # scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) + # method_to_durations["old_preproc"].append(time.time() - start_t) - start_t = time.time() - scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) - method_to_durations["new_preproc"].append(time.time() - start_t) + # start_t = time.time() + # scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) + # method_to_durations["new_preproc"].append(time.time() - start_t) - start_t = time.time() - preprocessing_module.forward_series(inputs_series, ludwig_model.backend) - method_to_durations["series_preproc"].append(time.time() - start_t) + # start_t = time.time() + # preprocessing_module.forward_series(inputs_series, ludwig_model.backend) + # method_to_durations["series_preproc"].append(time.time() - start_t) # start_t = time.time() # preprocess_for_prediction( @@ -148,13 +149,13 @@ def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): # scripted_model.preprocessor_forward(inputs_dict) # method_to_durations["ts_inf_preproc"].append(time.time() - start_t) - # start_t = time.time() - # ludwig_model.predict(inputs_df) - # method_to_durations["ludwig_predict"].append(time.time() - start_t) + start_t = time.time() + ludwig_model.predict(inputs_df) + method_to_durations["ludwig_predict"].append(time.time() - start_t) - # start_t = time.time() - # scripted_model(inputs_dict) - # method_to_durations["ts_inf_predict"].append(time.time() - start_t) + start_t = time.time() + scripted_model(inputs_dict) + method_to_durations["ts_inf_predict"].append(time.time() - start_t) print() print(f"Batch size: {batch_size}") From 93d281d8d3c4447989b2546d140b5773bbf55941 Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Tue, 28 Jun 2022 23:00:07 +0200 Subject: [PATCH 5/9] clean up for PR review --- ludwig/features/sequence_feature.py | 45 ----- .../test_sequence_features_throughput.py | 165 ------------------ 2 files changed, 210 deletions(-) delete mode 100644 tests/ludwig/features/test_sequence_features_throughput.py diff --git a/ludwig/features/sequence_feature.py b/ludwig/features/sequence_feature.py index 0a0e8ffb566..45a834789e2 100644 --- a/ludwig/features/sequence_feature.py +++ b/ludwig/features/sequence_feature.py @@ -87,45 +87,6 @@ def __init__(self, metadata: Dict[str, Any]): self.unit_to_id = metadata["str2idx"] self.computed_fill_value = metadata["preprocessing"]["computed_fill_value"] - @torch.jit.export - def forward_old(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: - """Takes a list of strings and returns a tensor of token ids.""" - if not torch.jit.isinstance(v, List[str]): - raise ValueError(f"Unsupported input: {v}") - - v = [self.computed_fill_value if s == "nan" else s for s in v] - - if self.lowercase: - sequences = [sequence.lower() for sequence in v] - else: - sequences = v - - unit_sequences = self.tokenizer(sequences) - # refines type of unit_sequences from Any to List[List[str]] - assert torch.jit.isinstance(unit_sequences, List[List[str]]), "unit_sequences is not a list of lists." - - sequence_matrix = torch.full( - [len(unit_sequences), self.max_sequence_length], self.unit_to_id[self.padding_symbol] - ) - sequence_matrix[:, 0] = self.unit_to_id[self.start_symbol] - for sample_idx, unit_sequence in enumerate(unit_sequences): - # Add if sequence length is less than max_sequence_length. Else, truncate to max_sequence_length. - if len(unit_sequence) + 1 < self.max_sequence_length: - sequence_length = len(unit_sequence) - sequence_matrix[sample_idx][len(unit_sequence) + 1] = self.unit_to_id[self.stop_symbol] - else: - sequence_length = self.max_sequence_length - 1 - - for i in range(sequence_length): - curr_unit = unit_sequence[i] - if curr_unit in self.unit_to_id: - curr_id = self.unit_to_id[curr_unit] - else: - curr_id = self.unit_to_id[self.unknown_symbol] - sequence_matrix[sample_idx][i + 1] = curr_id - - return sequence_matrix - def forward(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: """Takes a list of strings and returns a tensor of token ids.""" if not torch.jit.isinstance(v, List[str]): @@ -146,12 +107,6 @@ def forward(self, v: TorchscriptPreprocessingInput) -> torch.Tensor: return torch.stack(sequence_matrix) - @torch.jit.unused - def forward_series(self, column, backend) -> torch.Tensor: - column = backend.df_engine.map_objects(column, self._process_sequence) - sequence_matrix = backend.df_engine.compute(column).values.tolist() - return torch.stack(sequence_matrix) - def _process_sequence(self, sequence: str) -> torch.Tensor: sequence = self.computed_fill_value if sequence == "nan" else sequence diff --git a/tests/ludwig/features/test_sequence_features_throughput.py b/tests/ludwig/features/test_sequence_features_throughput.py deleted file mode 100644 index 822eab9f5af..00000000000 --- a/tests/ludwig/features/test_sequence_features_throughput.py +++ /dev/null @@ -1,165 +0,0 @@ -import collections -import random -import time -import os - -import numpy as np -import pandas as pd -import torch -import yaml -from tqdm import tqdm - -from ludwig.api import LudwigModel -from ludwig.data.preprocessing import preprocess_for_prediction -from ludwig.constants import TRAINER, NAME, TYPE -from ludwig.features.feature_registries import input_type_registry -from ludwig.models.inference import to_inference_module_input_from_dataframe -from ludwig.utils.misc_utils import get_from_registry -from tests.integration_tests.utils import ( - RAY_BACKEND_CONFIG, - binary_feature, - generate_data, - init_backend, - text_feature, -) - -MODE = "AGNEWS" -AGNEWS_CONFIG_STR = """ -input_features: - - name: description - type: text -output_features: - - name: class_index - type: category -trainer: - batch_size: 1024 - train_steps: 1 - steps_per_checkpoint: 1 - early_stop: 0 -backend: - type: ray - processor: - type: dask - trainer: - num_workers: 1 -""" -# AGNEWS_CSV_PATH = "/Users/geoffreyangus/.ludwig_cache/agnews_1.0/processed/agnews_tiny.csv" -AGNEWS_CSV_PATH = "/home/ray/agnews_tiny.csv" - - -def test_text_preproc_module_space_punct_tokenizer_speed(tmpdir): - if MODE == "AGNEWS": - config = yaml.safe_load(AGNEWS_CONFIG_STR) - feature_config = config["input_features"][0] - data_csv_path = AGNEWS_CSV_PATH - else: - feature_config = text_feature() - input_features = [feature_config] - output_features = [binary_feature()] - config = {"input_features": input_features, "output_features": output_features, TRAINER: {"epochs": 1}} - data_csv_path = generate_data( - input_features, output_features, os.path.join(tmpdir, "data.csv"), num_examples=100 - ) - - backend = "ray" - with init_backend(backend): - if backend == "ray": - backend = RAY_BACKEND_CONFIG - backend["processor"]["type"] = "dask" - - ludwig_model = LudwigModel(config, backend=backend) - _, _, output_directory = ludwig_model.train( - dataset=data_csv_path, - output_directory=os.path.join(tmpdir, "output"), - skip_save_processed_input=True, - ) - config = ludwig_model.config - - feature = get_from_registry(feature_config[TYPE], input_type_registry) - preprocessing_module = feature.create_preproc_module(ludwig_model.training_set_metadata[feature_config[NAME]]) - scripted_preprocessing_module = torch.jit.script(preprocessing_module) - - scripted_model = ludwig_model.to_torchscript() - scripted_model_path = os.path.join(tmpdir, "inference_module.pt") - torch.jit.save(scripted_model, scripted_model_path) - scripted_model = torch.jit.load(scripted_model_path) - - model_path = os.path.join(output_directory, "model") - ludwig_model = LudwigModel.load(model_path, backend="local") - - df = pd.read_csv(data_csv_path) - batch_sizes = [ - 1, - 2, - 4, - 8, - 16, - 32, - 64, - 128, - 256, - 512, - 1024, - 2048, - ] - - print("warming up...") - for _ in tqdm(range(10)): - batch_size = random.choice(batch_sizes) - inputs_df = df.sample(n=batch_size, replace=True) - - inputs_dict = to_inference_module_input_from_dataframe(inputs_df, config) - scripted_model(inputs_dict) - # scripted_model.preprocessor_forward(inputs_dict) - # scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) - # scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) - - print("benchmarking...") - for batch_size in batch_sizes: - inputs_df = df.sample(n=batch_size, replace=True) - inputs_dict = to_inference_module_input_from_dataframe(inputs_df, config) - inputs_series = ludwig_model.backend.df_engine.from_pandas(inputs_df)[feature_config[NAME]] - - method_to_durations = collections.defaultdict(list) - - for _ in tqdm(range(10)): - # start_t = time.time() - # scripted_preprocessing_module.forward_old(inputs_dict[feature_config[NAME]]) - # method_to_durations["old_preproc"].append(time.time() - start_t) - - # start_t = time.time() - # scripted_preprocessing_module(inputs_dict[feature_config[NAME]]) - # method_to_durations["new_preproc"].append(time.time() - start_t) - - # start_t = time.time() - # preprocessing_module.forward_series(inputs_series, ludwig_model.backend) - # method_to_durations["series_preproc"].append(time.time() - start_t) - - # start_t = time.time() - # preprocess_for_prediction( - # ludwig_model.config, - # dataset=inputs_df, - # training_set_metadata=ludwig_model.training_set_metadata, - # include_outputs=False, - # backend=ludwig_model.backend, - # ) - # method_to_durations["ludwig_preproc"].append(time.time() - start_t) - - # start_t = time.time() - # scripted_model.preprocessor_forward(inputs_dict) - # method_to_durations["ts_inf_preproc"].append(time.time() - start_t) - - start_t = time.time() - ludwig_model.predict(inputs_df) - method_to_durations["ludwig_predict"].append(time.time() - start_t) - - start_t = time.time() - scripted_model(inputs_dict) - method_to_durations["ts_inf_predict"].append(time.time() - start_t) - - print() - print(f"Batch size: {batch_size}") - for method, durations in method_to_durations.items(): - print(f"\t{method}:\t{np.mean(durations):.8f} +/- {np.std(durations):.8f}") - - print("done.") From 472e1c0705e9c9b3b753f8a88d9859da13b6ea5a Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Wed, 29 Jun 2022 01:30:48 +0200 Subject: [PATCH 6/9] fine-grained control of no_grad to account for parallelism --- ludwig/models/inference.py | 74 ++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 40 deletions(-) diff --git a/ludwig/models/inference.py b/ludwig/models/inference.py index 5e89dffe941..e5b16556d03 100644 --- a/ludwig/models/inference.py +++ b/ludwig/models/inference.py @@ -56,8 +56,7 @@ def __init__( def preprocessor_forward(self, inputs: Dict[str, TorchscriptPreprocessingInput]) -> Dict[str, torch.Tensor]: """Forward pass through the preprocessor.""" - with torch.no_grad(): - return self.preprocessor(inputs) + return self.preprocessor(inputs) def predictor_forward(self, preproc_inputs: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: """Forward pass through the predictor. @@ -67,24 +66,22 @@ def predictor_forward(self, preproc_inputs: Dict[str, torch.Tensor]) -> Dict[str for k, v in preproc_inputs.items(): preproc_inputs[k] = v.to(self.predictor.device) - with torch.no_grad(): + with torch.no_grad(): # Ensure model params do not store gradients predictions_flattened = self.predictor(preproc_inputs) return predictions_flattened def postprocessor_forward(self, predictions_flattened: Dict[str, torch.Tensor]) -> Dict[str, Dict[str, Any]]: """Forward pass through the postprocessor.""" - with torch.no_grad(): - postproc_outputs_flattened: Dict[str, Any] = self.postprocessor(predictions_flattened) - # Turn flat inputs into nested predictions per feature name - postproc_outputs: Dict[str, Dict[str, Any]] = _unflatten_dict_by_feature_name(postproc_outputs_flattened) - return postproc_outputs + postproc_outputs_flattened: Dict[str, Any] = self.postprocessor(predictions_flattened) + # Turn flat inputs into nested predictions per feature name + postproc_outputs: Dict[str, Dict[str, Any]] = _unflatten_dict_by_feature_name(postproc_outputs_flattened) + return postproc_outputs def forward(self, inputs: Dict[str, TorchscriptPreprocessingInput]) -> Dict[str, Dict[str, Any]]: - with torch.no_grad(): - preproc_inputs: Dict[str, torch.Tensor] = self.preprocessor_forward(inputs) - predictions_flattened: Dict[str, torch.Tensor] = self.predictor_forward(preproc_inputs) - postproc_outputs: Dict[str, Dict[str, Any]] = self.postprocessor_forward(predictions_flattened) - return postproc_outputs + preproc_inputs: Dict[str, torch.Tensor] = self.preprocessor_forward(inputs) + predictions_flattened: Dict[str, torch.Tensor] = self.predictor_forward(preproc_inputs) + postproc_outputs: Dict[str, Dict[str, Any]] = self.postprocessor_forward(predictions_flattened) + return postproc_outputs @torch.jit.unused def predict( @@ -172,12 +169,11 @@ def __init__(self, config: Dict[str, Any], training_set_metadata: Dict[str, Any] self.preproc_modules[module_dict_key] = feature.create_preproc_module(training_set_metadata[feature_name]) def forward(self, inputs: Dict[str, TorchscriptPreprocessingInput]) -> Dict[str, torch.Tensor]: - with torch.no_grad(): - preproc_inputs = {} - for module_dict_key, preproc in self.preproc_modules.items(): - feature_name = get_name_from_module_dict_key(module_dict_key) - preproc_inputs[feature_name] = preproc(inputs[feature_name]) - return preproc_inputs + preproc_inputs = {} + for module_dict_key, preproc in self.preproc_modules.items(): + feature_name = get_name_from_module_dict_key(module_dict_key) + preproc_inputs[feature_name] = preproc(inputs[feature_name]) + return preproc_inputs class _InferencePredictor(nn.Module): @@ -200,17 +196,16 @@ def __init__(self, model: "ECD", device: TorchDevice): self.predict_modules[module_dict_key] = feature.prediction_module.to(device=self.device) def forward(self, preproc_inputs: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: - with torch.no_grad(): - model_outputs = self.model(preproc_inputs) - predictions_flattened: Dict[str, torch.Tensor] = {} - for module_dict_key, predict in self.predict_modules.items(): - feature_name = get_name_from_module_dict_key(module_dict_key) - feature_predictions = predict(model_outputs, feature_name) - # Flatten out the predictions to support Triton input/output - for predict_key, tensor_values in feature_predictions.items(): - predict_concat_key = output_feature_utils.get_feature_concat_name(feature_name, predict_key) - predictions_flattened[predict_concat_key] = tensor_values - return predictions_flattened + model_outputs = self.model(preproc_inputs) + predictions_flattened: Dict[str, torch.Tensor] = {} + for module_dict_key, predict in self.predict_modules.items(): + feature_name = get_name_from_module_dict_key(module_dict_key) + feature_predictions = predict(model_outputs, feature_name) + # Flatten out the predictions to support Triton input/output + for predict_key, tensor_values in feature_predictions.items(): + predict_concat_key = output_feature_utils.get_feature_concat_name(feature_name, predict_key) + predictions_flattened[predict_concat_key] = tensor_values + return predictions_flattened class _InferencePostprocessor(nn.Module): @@ -231,16 +226,15 @@ def __init__(self, model: "ECD", training_set_metadata: Dict[str, Any]): self.postproc_modules[module_dict_key] = feature.create_postproc_module(training_set_metadata[feature_name]) def forward(self, predictions_flattened: Dict[str, torch.Tensor]) -> Dict[str, Any]: - with torch.no_grad(): - postproc_outputs_flattened: Dict[str, Any] = {} - for module_dict_key, postproc in self.postproc_modules.items(): - feature_name = get_name_from_module_dict_key(module_dict_key) - feature_postproc_outputs = postproc(predictions_flattened, feature_name) - # Flatten out the predictions to support Triton input/output - for postproc_key, tensor_values in feature_postproc_outputs.items(): - postproc_concat_key = output_feature_utils.get_feature_concat_name(feature_name, postproc_key) - postproc_outputs_flattened[postproc_concat_key] = tensor_values - return postproc_outputs_flattened + postproc_outputs_flattened: Dict[str, Any] = {} + for module_dict_key, postproc in self.postproc_modules.items(): + feature_name = get_name_from_module_dict_key(module_dict_key) + feature_postproc_outputs = postproc(predictions_flattened, feature_name) + # Flatten out the predictions to support Triton input/output + for postproc_key, tensor_values in feature_postproc_outputs.items(): + postproc_concat_key = output_feature_utils.get_feature_concat_name(feature_name, postproc_key) + postproc_outputs_flattened[postproc_concat_key] = tensor_values + return postproc_outputs_flattened def save_ludwig_model_for_inference( From be420b0682e752f17e9a4e115695bc9dff9fce03 Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Wed, 29 Jun 2022 01:32:27 +0200 Subject: [PATCH 7/9] nit --- ludwig/models/inference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ludwig/models/inference.py b/ludwig/models/inference.py index e5b16556d03..bf24808d5b4 100644 --- a/ludwig/models/inference.py +++ b/ludwig/models/inference.py @@ -66,7 +66,7 @@ def predictor_forward(self, preproc_inputs: Dict[str, torch.Tensor]) -> Dict[str for k, v in preproc_inputs.items(): preproc_inputs[k] = v.to(self.predictor.device) - with torch.no_grad(): # Ensure model params do not store gradients + with torch.no_grad(): # Ensure model params do not compute gradients predictions_flattened = self.predictor(preproc_inputs) return predictions_flattened From d2698a0b477498b5734de926c043cbffb430f3ed Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Wed, 29 Jun 2022 01:37:48 +0200 Subject: [PATCH 8/9] added unit test to check no gradients --- tests/integration_tests/test_torchscript.py | 1 + tests/integration_tests/utils.py | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/tests/integration_tests/test_torchscript.py b/tests/integration_tests/test_torchscript.py index 7165af1ea73..0053576ce47 100644 --- a/tests/integration_tests/test_torchscript.py +++ b/tests/integration_tests/test_torchscript.py @@ -622,6 +622,7 @@ def validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path assert output_name in feature_outputs output_values = feature_outputs[output_name] + assert utils.has_no_grad(output_values), f'"{feature_name}.{output_name}" tensors have gradients' assert utils.is_all_close( output_values, output_values_expected ), f"feature: {feature_name}, output: {output_name}" diff --git a/tests/integration_tests/utils.py b/tests/integration_tests/utils.py index 370ff0b7795..96e6328e842 100644 --- a/tests/integration_tests/utils.py +++ b/tests/integration_tests/utils.py @@ -501,6 +501,17 @@ def get_weights(model: torch.nn.Module) -> List[torch.Tensor]: return [param.data for param in model.parameters()] +def has_no_grad( + val: Union[np.ndarray, torch.Tensor, str, list], +): + """Checks if two values are close to each other.""" + if isinstance(val, list): + return all(has_no_grad(v) for v in val) + if isinstance(val, torch.Tensor): + return not val.requires_grad + return True + + def is_all_close( val1: Union[np.ndarray, torch.Tensor, str, list], val2: Union[np.ndarray, torch.Tensor, str, list], From 76310eb0b69886839aa5dfd7f235bb5d0e2aa0dc Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Wed, 29 Jun 2022 01:38:38 +0200 Subject: [PATCH 9/9] unify assert message format --- tests/integration_tests/test_torchscript.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/test_torchscript.py b/tests/integration_tests/test_torchscript.py index 0053576ce47..a15f6040dc8 100644 --- a/tests/integration_tests/test_torchscript.py +++ b/tests/integration_tests/test_torchscript.py @@ -625,7 +625,7 @@ def validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path assert utils.has_no_grad(output_values), f'"{feature_name}.{output_name}" tensors have gradients' assert utils.is_all_close( output_values, output_values_expected - ), f"feature: {feature_name}, output: {output_name}" + ), f'"{feature_name}.{output_name}" tensors are not close to ludwig model' def initialize_torchscript_module(tmpdir, config, backend, training_data_csv_path, device=None):