diff --git a/ludwig/features/audio_feature.py b/ludwig/features/audio_feature.py index dce482c83c5..9011e8a47a1 100644 --- a/ludwig/features/audio_feature.py +++ b/ludwig/features/audio_feature.py @@ -243,10 +243,13 @@ def add_feature_data( 'for audio.') csv_path = None + # this is not super nice, but works both and DFs and lists + first_path = '.' + for first_path in dataset_df[feature['name']]: + break if hasattr(dataset_df, 'csv'): csv_path = os.path.dirname(os.path.abspath(dataset_df.csv)) - if (csv_path is None and - not os.path.isabs(dataset_df[feature['name']][0])): + if csv_path is None and not os.path.isabs(first_path): raise ValueError( 'Audio file paths must be absolute' ) @@ -281,10 +284,10 @@ def add_feature_data( (num_audio_utterances, max_length, feature_dim), dtype=np.float32 ) - for i in range(len(dataset_df)): + for i, path in enumerate(dataset_df[feature['name']]): filepath = get_abs_path( csv_path, - dataset_df[feature['name']][i] + path ) audio_feature = AudioBaseFeature._read_audio_and_transform_to_feature( filepath, audio_feature_dict, feature_dim, max_length, diff --git a/ludwig/features/bag_feature.py b/ludwig/features/bag_feature.py index a654e32bbe5..e8f1d79340b 100644 --- a/ludwig/features/bag_feature.py +++ b/ludwig/features/bag_feature.py @@ -25,7 +25,6 @@ from ludwig.features.base_feature import InputFeature from ludwig.features.feature_utils import set_str_to_idx from ludwig.models.modules.bag_encoders import BagEmbedWeightedEncoder -from ludwig.models.modules.embedding_modules import EmbedWeighted from ludwig.utils.misc import set_default_value from ludwig.utils.strings_utils import create_vocabulary @@ -70,13 +69,14 @@ def feature_data(column, metadata, preprocessing_parameters): dtype=float ) - for i in range(len(column)): + for i, set_str in enumerate(column): col_counter = Counter(set_str_to_idx( - column[i], + set_str, metadata['str2idx'], preprocessing_parameters['tokenizer']) ) - bag_matrix[i, list(col_counter.keys())] = list(col_counter.values()) + bag_matrix[i, list(col_counter.keys())] = list( + col_counter.values()) return bag_matrix diff --git a/ludwig/features/base_feature.py b/ludwig/features/base_feature.py index 598fcc45202..ae735246c03 100644 --- a/ludwig/features/base_feature.py +++ b/ludwig/features/base_feature.py @@ -168,11 +168,12 @@ def call( ): # account for output feature target if isinstance(inputs, tuple): - inputs, target = inputs + local_inputs, target = inputs else: + local_inputs = inputs target = None - combiner_outputs, other_output_hidden = inputs + combiner_outputs, other_output_hidden = local_inputs # extract the combined hidden layer combiner_output = combiner_outputs['combiner_output'] @@ -389,6 +390,5 @@ def prepare_decoder_inputs( training=training, mask=mask ) - other_output_features[self.feature_name] = feature_hidden return feature_hidden diff --git a/ludwig/features/binary_feature.py b/ludwig/features/binary_feature.py index 18c442caeb2..a582a0170ca 100644 --- a/ludwig/features/binary_feature.py +++ b/ludwig/features/binary_feature.py @@ -35,7 +35,7 @@ from ludwig.utils.metrics_utils import precision_recall_curve from ludwig.utils.metrics_utils import roc_auc_score from ludwig.utils.metrics_utils import roc_curve -from ludwig.utils.misc import set_default_value, get_from_registry +from ludwig.utils.misc import set_default_value from ludwig.utils.misc import set_default_values logger = logging.getLogger(__name__) @@ -79,12 +79,7 @@ def __init__(self, feature, encoder_obj=None): if encoder_obj: self.encoder_obj = encoder_obj else: - self.encoder_obj = self.get_binary_encoder(feature) - - def get_binary_encoder(self, encoder_parameters): - return get_from_registry(self.encoder, self.encoder_registry)( - **encoder_parameters - ) + self.encoder_obj = self.initialize_encoder(feature) def call(self, inputs, training=None, mask=None): assert isinstance(inputs, tf.Tensor) diff --git a/ludwig/features/image_feature.py b/ludwig/features/image_feature.py index 5612548cff9..e7ea37e8ecb 100644 --- a/ludwig/features/image_feature.py +++ b/ludwig/features/image_feature.py @@ -252,11 +252,15 @@ def add_feature_data( if num_images == 0: raise ValueError('There are no images in the dataset provided.') - first_image_path = dataset_df[feature['name']][0] - if csv_path is None and not os.path.isabs(first_image_path): + # this is not super nice, but works both and DFs and lists + first_path = '.' + for first_path in dataset_df[feature['name']]: + break + + if csv_path is None and not os.path.isabs(first_path): raise ValueError('Image file paths must be absolute') - first_image_path = get_abs_path(csv_path, first_image_path) + first_path = get_abs_path(csv_path, first_path) ( should_resize, @@ -266,7 +270,7 @@ def add_feature_data( user_specified_num_channels, first_image ) = ImageBaseFeature._finalize_preprocessing_parameters( - preprocessing_parameters, first_image_path + preprocessing_parameters, first_path ) metadata[feature['name']]['preprocessing']['height'] = height diff --git a/ludwig/features/set_feature.py b/ludwig/features/set_feature.py index 6ff85183f23..8d7c68d8f94 100644 --- a/ludwig/features/set_feature.py +++ b/ludwig/features/set_feature.py @@ -20,7 +20,6 @@ import numpy as np import tensorflow as tf - from tensorflow.keras.metrics import MeanIoU from ludwig.constants import * @@ -28,11 +27,10 @@ from ludwig.features.base_feature import InputFeature from ludwig.features.base_feature import OutputFeature from ludwig.features.feature_utils import set_str_to_idx -from ludwig.models.modules.initializer_modules import get_initializer from ludwig.models.modules.loss_modules import SigmoidCrossEntropyLoss from ludwig.models.modules.metric_modules import SigmoidCrossEntropyMetric -from ludwig.models.modules.set_encoders import SetSparseEncoder from ludwig.models.modules.set_decoders import Classifier +from ludwig.models.modules.set_encoders import SetSparseEncoder from ludwig.utils.misc import set_default_value from ludwig.utils.strings_utils import create_vocabulary @@ -168,12 +166,15 @@ def __init__(self, feature): def logits( self, inputs, # hidden + **kwargs ): - return self.decoder_obj(inputs) + hidden = inputs[HIDDEN] + return self.decoder_obj(hidden) def predictions( self, inputs, # logits + **kwargs ): logits = inputs[LOGITS] @@ -284,9 +285,10 @@ def postprocess_results( del result[PREDICTIONS] if PROBABILITIES in result and len(result[PROBABILITIES]) > 0: - probs = result[PROBABILITIES] + probs = result[PROBABILITIES].numpy() prob = [[prob for prob in prob_set if - prob >= output_feature['threshold']] for prob_set in probs] + prob >= output_feature['threshold']] for prob_set in + probs] postprocessed[PROBABILITIES] = probs postprocessed[PROBABILITY] = prob diff --git a/ludwig/features/vector_feature.py b/ludwig/features/vector_feature.py index 6add10af8a5..d3b5d30543c 100644 --- a/ludwig/features/vector_feature.py +++ b/ludwig/features/vector_feature.py @@ -18,7 +18,6 @@ import os import numpy as np - import tensorflow as tf from tensorflow.keras.losses import MeanAbsoluteError from tensorflow.keras.losses import MeanSquaredError @@ -167,7 +166,7 @@ def call(self, inputs, training=None, mask=None): inputs, training=training, mask=mask ) - return {'encoder_outputs': inputs_encoded} + return inputs_encoded @staticmethod def update_model_definition_with_metadata( @@ -210,12 +209,15 @@ def __init__(self, feature): def logits( self, inputs, # hidden + **kwargs ): - return self.decoder_obj(inputs) + hidden = inputs[HIDDEN] + return self.decoder_obj(hidden) def predictions( self, inputs, # logits + **kwargs ): return {PREDICTIONS: inputs[LOGITS], LOGITS: inputs[LOGITS]} @@ -286,7 +288,7 @@ def postprocess_results( name = output_feature['name'] if PREDICTIONS in result and len(result[PREDICTIONS]) > 0: - postprocessed[PREDICTIONS] = result[PREDICTIONS] + postprocessed[PREDICTIONS] = result[PREDICTIONS].numpy() if not skip_save_unprocessed_output: np.save( npy_filename.format(name, PREDICTIONS), diff --git a/ludwig/models/model.py b/ludwig/models/model.py index 0badf2131a9..b5d062d403b 100644 --- a/ludwig/models/model.py +++ b/ludwig/models/model.py @@ -24,15 +24,18 @@ import copy import logging import os +import os.path +import pickle import re import signal import sys import threading +import tempfile import time from collections import OrderedDict import tensorflow as tf -# import tensorflow as tf2 # todo: tf2 port + from tabulate import tabulate from tqdm import tqdm @@ -160,8 +163,8 @@ def evaluation_step(self, model, inputs, targets): return predictions @tf.function - def predict_step(self, model, x): - return model(x, training=False) + def predict_step(self, model, inputs): + return model.predictions(inputs, output_features=None) # def initialize_session(self, gpus=None, gpu_fraction=1): # if self.session is None: @@ -379,8 +382,8 @@ def train( # ====== Setup session ======= # todo tf2: reintroduce restoring weights - # if self.weights_save_path: - # self.restore(session, self.weights_save_path) + if self.weights_save_path: + self.restore(self.weights_save_path) # todo tf2: reintroduce tensorboard logging # train_writer = None @@ -406,13 +409,12 @@ def train( model_weights_path ) # todo tf3: reintroduce session resume - # if is_on_master(): - # self.resume_session( - # session, - # save_path, - # model_weights_path, - # model_weights_progress_path - # ) + if is_on_master(): + self.resume_session( + save_path, + model_weights_path, + model_weights_progress_path + ) else: ( train_metrics, @@ -450,6 +452,7 @@ def train( # ================ Training Loop ================ while progress_tracker.epoch < self.epochs: + print(">>>> progress tracker epoch", progress_tracker.epoch) # epoch init start_time = time.time() if is_on_master(): @@ -651,52 +654,51 @@ def train( ) ) - # if should_validate: - # should_break = self.check_progress_on_validation( - # progress_tracker, - # validation_field, - # validation_metric, - # session, - # model_weights_path, - # model_hyperparameters_path, - # reduce_learning_rate_on_plateau, - # reduce_learning_rate_on_plateau_patience, - # reduce_learning_rate_on_plateau_rate, - # increase_batch_size_on_plateau_patience, - # increase_batch_size_on_plateau, - # increase_batch_size_on_plateau_max, - # increase_batch_size_on_plateau_rate, - # early_stop, - # skip_save_model - # ) - # if should_break: - # break - # else: - # # there's no validation, so we save the model at each iteration - # if is_on_master(): - # if not skip_save_model: - # self.save_weights(session, model_weights_path) - # self.save_hyperparameters( - # self.hyperparameters, - # model_hyperparameters_path - # ) - # - # # ========== Save training progress ========== - # if is_on_master(): - # if not skip_save_progress: - # self.save_weights(session, model_weights_progress_path) - # progress_tracker.save( - # os.path.join( - # save_path, - # TRAINING_PROGRESS_FILE_NAME - # ) - # ) - # if skip_save_model: - # self.save_hyperparameters( - # self.hyperparameters, - # model_hyperparameters_path - # ) - # + if should_validate: + should_break = self.check_progress_on_validation( + progress_tracker, + validation_field, + validation_metric, + model_weights_path, + model_hyperparameters_path, + reduce_learning_rate_on_plateau, + reduce_learning_rate_on_plateau_patience, + reduce_learning_rate_on_plateau_rate, + increase_batch_size_on_plateau_patience, + increase_batch_size_on_plateau, + increase_batch_size_on_plateau_max, + increase_batch_size_on_plateau_rate, + early_stop, + skip_save_model + ) + if should_break: + break + else: + # there's no validation, so we save the model at each iteration + if is_on_master(): + if not skip_save_model: + self.save_weights(model_weights_path) + self.save_hyperparameters( + self.hyperparameters, + model_hyperparameters_path + ) + + # ========== Save training progress ========== + if is_on_master(): + if not skip_save_progress: + self.save_weights(model_weights_progress_path) + progress_tracker.save( + os.path.join( + save_path, + TRAINING_PROGRESS_FILE_NAME + ) + ) + if skip_save_model: + self.save_hyperparameters( + self.hyperparameters, + model_hyperparameters_path + ) + # if is_on_master(): # contrib_command("train_epoch_end", progress_tracker) # logger.info('') @@ -757,7 +759,12 @@ def append_metrics(self, dataset_name, results, metrics_log, tables): for output_feature in self.ecd.output_features: scores = [dataset_name] - for metric in metrics_log[output_feature]: + # collect metric names based on output features metrics to + # ensure consistent order of reporting metrics + metric_names = self.ecd.output_features[output_feature]\ + .metric_functions.keys() + + for metric in metric_names: score = results[output_feature][metric] metrics_log[output_feature][metric].append(score) scores.append(score) @@ -802,15 +809,24 @@ def batch_evaluation( # create array for predictors # todo: tf2 need to handle case of single predictor, e.g., image inputs = {i_feat['name']: batch[i_feat['name']] for i_feat in self.hyperparameters['input_features']} - targets = {o_feat['name']: batch[o_feat['name']] for o_feat in self.hyperparameters['output_features']} - ( - preds - ) = self.evaluation_step( - self.ecd, - inputs, - targets - ) + if only_predictions: + ( + preds + ) = self.predict_step( + self.ecd, + inputs + ) + else: + targets = {o_feat['name']: batch[o_feat['name']] for o_feat in self.hyperparameters['output_features']} + + ( + preds + ) = self.evaluation_step( + self.ecd, + inputs, + targets + ) # accumulate predictions from batch for each output feature for of_name, of_preds in preds.items(): @@ -839,9 +855,12 @@ def batch_evaluation( for pred_name, pred_value_list in of_predictions.items(): predictions[of_name][pred_name] = tf.concat(pred_value_list, axis=0) - metrics = self.ecd.get_metrics() - self.ecd.reset_metrics() - return metrics, predictions + if only_predictions: + return predictions + else: + metrics = self.ecd.get_metrics() + self.ecd.reset_metrics() + return metrics, predictions def evaluation( self, @@ -932,7 +951,7 @@ def check_progress_on_validation( progress_tracker, validation_field, validation_metric, - session, model_weights_path, + model_weights_path, model_hyperparameters_path, reduce_learning_rate_on_plateau, reduce_learning_rate_on_plateau_patience, @@ -957,7 +976,7 @@ def check_progress_on_validation( validation_field][validation_metric][-1] if is_on_master(): if not skip_save_model: - self.save_weights(session, model_weights_path) + self.save_weights(model_weights_path) self.save_hyperparameters( self.hyperparameters, model_hyperparameters_path @@ -1028,14 +1047,22 @@ def predict( **kwargs ): # predict - eval_metrics, eval_predictions = self.batch_evaluation( - dataset, - batch_size, - collect_predictions=True, - only_predictions=not evaluate_performance - ) - - return eval_metrics, eval_predictions + if evaluate_performance: + eval_metrics, eval_predictions = self.batch_evaluation( + dataset, + batch_size, + collect_predictions=True, + only_predictions=not evaluate_performance + ) + return eval_metrics, eval_predictions + else: + eval_predictions = self.batch_evaluation( + dataset, + batch_size, + collect_predictions=True, + only_predictions=not evaluate_performance + ) + return eval_predictions def collect_activations( self, @@ -1111,10 +1138,9 @@ def collect_weights( # return collected_tensors pass - def save_weights(self, session, save_path): - # todo tf2: reintroduce functionality - # self.weights_save_path = self.saver.save(session, save_path) - pass + def save_weights(self, save_path): + # save model + self.ecd.save_weights(save_path) def save_hyperparameters(self, hyperparameters, save_path): # removing pretrained embeddings paths from hyperparameters @@ -1159,10 +1185,8 @@ def save_savedmodel(self, save_path): # builder.save() pass - def restore(self, session, weights_path): - # todo tf2: reintroduce this functionality - # self.saver.restore(session, weights_path) - pass + def restore(self, weights_path): + self.ecd.load_weights(weights_path) @staticmethod def load(load_path, use_horovod=False): @@ -1176,6 +1200,7 @@ def load(load_path, use_horovod=False): load_path, MODEL_WEIGHTS_FILE_NAME ) + model.restore(model.weights_save_path) return model def set_epochs_to_1_or_quit(self, signum, frame): @@ -1298,7 +1323,6 @@ def initialize_batcher( def resume_session( self, - session, save_path, model_weights_path, model_weights_progress_path @@ -1309,9 +1333,9 @@ def resume_session( if pattern.match(file_path): num_matching_files += 1 if num_matching_files == 3: - self.restore(session, model_weights_progress_path) + self.restore(model_weights_progress_path) else: - self.restore(session, model_weights_path) + self.restore(model_weights_path) def reduce_learning_rate( self, diff --git a/ludwig/models/modules/generic_decoders.py b/ludwig/models/modules/generic_decoders.py index 3b56669a497..f68aeac3644 100644 --- a/ludwig/models/modules/generic_decoders.py +++ b/ludwig/models/modules/generic_decoders.py @@ -56,7 +56,7 @@ class Projector(Layer): def __init__( self, - num_classes, + vector_size, use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', @@ -69,7 +69,7 @@ def __init__( ): super().__init__() self.dense = Dense( - num_classes, + vector_size, use_bias=use_bias, kernel_initializer=kernel_initializer, bias_initializer=bias_initializer, diff --git a/ludwig/models/modules/metric_modules.py b/ludwig/models/modules/metric_modules.py index 912c23665d5..09577e39f6b 100644 --- a/ludwig/models/modules/metric_modules.py +++ b/ludwig/models/modules/metric_modules.py @@ -18,7 +18,8 @@ import tensorflow as tf from ludwig.constants import * -from ludwig.models.modules.loss_modules import BWCEWLoss +from ludwig.models.modules.loss_modules import BWCEWLoss, \ + SigmoidCrossEntropyLoss from ludwig.models.modules.loss_modules import SequenceLoss from ludwig.models.modules.loss_modules import SoftmaxCrossEntropyLoss from ludwig.utils.tf_utils import sequence_length_2D @@ -43,7 +44,7 @@ class R2Score(tf.keras.metrics.Metric): # todo tf2 - convert to tensors? - def __init__(self, name='r2_score'): + def __init__(self, name='r2_score', **kwargs): super(R2Score, self).__init__(name=name) self._reset_states() @@ -81,7 +82,7 @@ class ErrorScore(tf.keras.metrics.Metric): # todo tf2 - convert to tensors? - def __init__(self, name='error_score'): + def __init__(self, name='error_score', **kwargs): super(ErrorScore, self).__init__(name=name) self._reset_states() diff --git a/ludwig/models/modules/sequence_decoders.py b/ludwig/models/modules/sequence_decoders.py index 615a8f96168..0924798d928 100644 --- a/ludwig/models/modules/sequence_decoders.py +++ b/ludwig/models/modules/sequence_decoders.py @@ -455,9 +455,10 @@ def decoder_greedy( # ================ predictions ================= greedy_sampler = tfa.seq2seq.GreedyEmbeddingSampler() - # decoder_input = tf.expand_dims([self.GO_SYMBOL] * batch_size, 1) + decoder_input = tf.expand_dims([self.GO_SYMBOL] * batch_size, 1) start_tokens = tf.fill([batch_size], self.GO_SYMBOL) end_token = self.END_SYMBOL + decoder_inp_emb = self.decoder_embedding(decoder_input) if self.attention_mechanism is not None: self.attention_mechanism.setup_memory( diff --git a/ludwig/predict.py b/ludwig/predict.py index 99eb6a1f064..8a33e1d784a 100644 --- a/ludwig/predict.py +++ b/ludwig/predict.py @@ -99,7 +99,7 @@ def full_predict( gpu_fraction, debug ) - model.close_session() + # model.close_session() # todo tf2 code clean -up if is_on_master(): # setup directories and file names diff --git a/tests/integration_tests/test_model_training_options.py b/tests/integration_tests/test_model_training_options.py new file mode 100644 index 00000000000..c2fcffaeda1 --- /dev/null +++ b/tests/integration_tests/test_model_training_options.py @@ -0,0 +1,356 @@ +import json +import os.path +from collections import namedtuple + +import numpy as np +import pandas as pd +import pytest +from sklearn.model_selection import train_test_split + +from ludwig.api import LudwigModel +from ludwig.data.preprocessing import get_split +from ludwig.experiment import full_experiment +from ludwig.utils.data_utils import split_dataset_tvt, read_csv +from tests.integration_tests.utils import binary_feature, numerical_feature, \ + category_feature, sequence_feature, date_feature, h3_feature, \ + set_feature, generate_data, text_feature, vector_feature, bag_feature, \ + image_feature, audio_feature, timeseries_feature + +GeneratedData = namedtuple('GeneratedData', + 'train_df validation_df test_df') + + +def get_feature_definitions(): + input_features = [ + {'name': 'x', 'type': 'numerical'}, + ] + output_features = [ + {'name': 'y', 'type': 'numerical', 'loss': {'type': 'mean_squared_error'}, + 'num_fc_layers': 5, 'fc_size': 64} + ] + + return input_features, output_features + + +@pytest.fixture(scope='module') +def generated_data(): + # function generates simple training data that guarantee convergence + # within 30 epochs for suitable model definition + NUMBER_OBSERVATIONS = 500 + + # generate data + np.random.seed(43) + x = np.array(range(NUMBER_OBSERVATIONS)).reshape(-1, 1) + y = 2*x + 1 + np.random.normal(size=x.shape[0]).reshape(-1, 1) + raw_df = pd.DataFrame(np.concatenate((x, y), axis=1), columns=['x', 'y']) + + # create training data + train, valid_test = train_test_split(raw_df, train_size=0.7) + + # create validation and test data + validation, test = train_test_split(valid_test, train_size=0.5) + + return GeneratedData(train, validation, test) + +@pytest.mark.parametrize('early_stop', [3, 5]) +def test_early_stopping(early_stop, generated_data, tmp_path): + + input_features, output_features = get_feature_definitions() + + model_definition = { + 'input_features': input_features, + 'output_features': output_features, + 'combiner': { + 'type': 'concat' + }, + 'training': { + 'epochs': 30, + 'early_stop': early_stop, + 'batch_size': 16 + } + } + + # create sub-directory to store results + results_dir = tmp_path / 'results' + results_dir.mkdir() + + # run experiment + exp_dir_name = full_experiment( + data_train_df=generated_data.train_df, + data_validation_df=generated_data.validation_df, + data_test_df=generated_data.test_df, + output_directory=str(results_dir), + model_definition=model_definition, + skip_save_processed_input=True, + skip_save_progress=True, + skip_save_unprocessed_output=True, + skip_save_model=True, + skip_save_log=True + ) + + # test existence of required files + train_stats_fp = os.path.join(exp_dir_name, 'training_statistics.json') + metadata_fp = os.path.join(exp_dir_name, 'description.json') + assert os.path.isfile(train_stats_fp) + assert os.path.isfile(metadata_fp) + + # retrieve results so we can validate early stopping + with open(train_stats_fp,'r') as f: + train_stats = json.load(f) + with open(metadata_fp, 'r') as f: + metadata = json.load(f) + + # get early stopping value + early_stop_value = metadata['model_definition']['training']['early_stop'] + + # retrieve validation losses + vald_losses = np.array(train_stats['validation']['combined']['loss']) + last_epoch = vald_losses.shape[0] + best_epoch = np.argmin(vald_losses) + + # confirm early stopping + assert (last_epoch - best_epoch - 1) == early_stop_value + +@pytest.mark.parametrize('skip_save_progress', [False, True]) +@pytest.mark.parametrize('skip_save_model', [False, True]) +def test_model_progress_save( + skip_save_progress, + skip_save_model, + generated_data, + tmp_path +): + + input_features, output_features = get_feature_definitions() + + model_definition = { + 'input_features': input_features, + 'output_features': output_features, + 'combiner': {'type': 'concat'}, + 'training': {'epochs': 5} + } + + # create sub-directory to store results + results_dir = tmp_path / 'results' + results_dir.mkdir() + + # run experiment + exp_dir_name = full_experiment( + data_train_df=generated_data.train_df, + data_validation_df=generated_data.validation_df, + data_test_df=generated_data.test_df, + output_directory=str(results_dir), + model_definition=model_definition, + skip_save_processed_input=True, + skip_save_progress=skip_save_progress, + skip_save_unprocessed_output=True, + skip_save_model=skip_save_model, + skip_save_log=True + ) + + #========== Check for required result data sets ============= + if skip_save_model: + assert not os.path.isdir( + os.path.join(exp_dir_name, 'model', 'model_weights') + ) + else: + assert os.path.isdir( + os.path.join(exp_dir_name, 'model', 'model_weights') + ) + + if skip_save_progress: + assert not os.path.isdir( + os.path.join(exp_dir_name, 'model', 'model_weights_progress') + ) + else: + assert os.path.isdir( + os.path.join(exp_dir_name, 'model', 'model_weights_progress') + ) + + +# work-in-progress +def test_model_save_resume(generated_data, tmp_path): + + input_features, output_features = get_feature_definitions() + model_definition = { + 'input_features': input_features, + 'output_features': output_features, + 'combiner': {'type': 'concat'}, + 'training': { + 'epochs': 7, + 'early_stop': 1000, + 'batch_size': 16, + 'optimizer': {'type': 'adam'} + } + } + + # create sub-directory to store results + results_dir = tmp_path / 'results' + results_dir.mkdir() + + exp_dir_name = full_experiment( + model_definition, + data_train_df=generated_data.train_df, + data_validation_df=generated_data.validation_df, + data_test_df=generated_data.test_df, + output_directory='results' #results_dir + ) + + y_pred1 = np.load(os.path.join(exp_dir_name, 'y_predictions.npy')) + + model_definition['training']['epochs'] = 15 + + full_experiment( + model_definition, + data_train_df=generated_data.train_df, + data_validation_df=generated_data.validation_df, + data_test_df=generated_data.test_df, + model_resume_path=exp_dir_name + ) + + y_pred2 = np.load(os.path.join(exp_dir_name, 'y_predictions.npy')) + + assert np.all(np.isclose(y_pred1, y_pred2)) + +# work-in-progress +# def test_model_save_resume(generated_data, tmp_path): +# +# input_features, output_features = get_feature_definitions() +# model_definition = { +# 'input_features': input_features, +# 'output_features': output_features, +# 'combiner': {'type': 'concat'}, +# 'training': {'epochs': 3, 'batch_size': 16} +# } +# +# # create sub-directory to store results +# results_dir = tmp_path / 'results' +# results_dir.mkdir() +# +# # perform inital model training +# ludwig_model = LudwigModel(model_definition) +# train_stats = ludwig_model.train( +# data_train_df=generated_data.train_df, +# data_validation_df=generated_data.validation_df, +# data_test_df=generated_data.test_df, +# output_directory='results' #results_dir +# ) +# +# # load saved model definition +# ludwig_model2 = LudwigModel.load( +# os.path.join(ludwig_model.exp_dir_name, 'model') +# ) +# +# for _, i_feature in ludwig_model2.model.ecd.input_features.items(): +# i_feature.encoder_obj(None, training=False) +# +# ludwig_model2.model.ecd.combiner({'x': {'encoder_output': [None]}}, training=False) +# +# for _, o_feature in ludwig_model2.model.ecd.output_features.items(): +# o_feature.decoder_obj(None, training=False) +# +# pass + + +def test_model_save_reload_API(csv_filename, tmp_path): + dir_path = os.path.dirname(csv_filename) + image_dest_folder = os.path.join(os.getcwd(), 'generated_images') + audio_dest_folder = os.path.join(os.getcwd(), 'generated_audio') + + input_features = [ + binary_feature(), + numerical_feature(), + category_feature(vocab_size=3), + sequence_feature(vocab_size=3), + text_feature(vocab_size=3), + vector_feature(), + image_feature(image_dest_folder), + audio_feature(audio_dest_folder), + timeseries_feature(), + date_feature(), + h3_feature(), + set_feature(vocab_size=3), + bag_feature(vocab_size=3), + ] + + output_features = [ + binary_feature(), + numerical_feature(), + category_feature(vocab_size=3), + # sequence_feature(vocab_size=3), + # text_feature(vocab_size=3), + set_feature(vocab_size=3), + vector_feature(), + ] + + # Generate test data + data_csv_path = generate_data(input_features, output_features, + csv_filename) + + ############# + # Train model + ############# + model_definition = { + 'input_features': input_features, + 'output_features': output_features, + 'training': {'epochs': 2} + } + + data_df = read_csv(data_csv_path) + training_set, test_set, validation_set = split_dataset_tvt( + data_df, + get_split(data_df) + ) + training_set = pd.DataFrame(training_set) + validation_set = pd.DataFrame(validation_set) + test_set = pd.DataFrame(test_set) + + # create sub-directory to store results + results_dir = tmp_path / 'results' + results_dir.mkdir() + + # perform initial model training + ludwig_model1 = LudwigModel(model_definition) + train_stats = ludwig_model1.train( + data_train_df=training_set, + data_validation_df=validation_set, + data_test_df=test_set, + output_directory='results' # results_dir + ) + + preds_1 = ludwig_model1.predict(data_df=validation_set) + + # load saved model + ludwig_model2 = LudwigModel.load( + os.path.join(ludwig_model1.exp_dir_name, 'model') + ) + + preds_2 = ludwig_model2.predict(data_df=validation_set) + + for key in preds_1: + assert preds_1[key].dtype == preds_2[key].dtype + assert preds_1[key].equals(preds_2[key]) + + # col_dtype = preds_1[key].dtype + # if col_dtype in {'int32', 'int64', 'float32', 'float64'}: + # assert np.allclose(preds_1[key], preds_2[key]) + # else: + # assert preds_1[key].equals(preds_2[key]) + + for if_name in ludwig_model1.model.ecd.input_features: + if1 = ludwig_model1.model.ecd.input_features[if_name] + if2 = ludwig_model2.model.ecd.input_features[if_name] + for if1_w, if2_w in zip(if1.encoder_obj.weights, + if2.encoder_obj.weights): + assert np.allclose(if1_w.numpy(), if2_w.numpy()) + + c1 = ludwig_model1.model.ecd.combiner + c2 = ludwig_model2.model.ecd.combiner + for c1_w, c2_w in zip(c1.weights, c2.weights): + assert np.allclose(c1_w.numpy(), c2_w.numpy()) + + for of_name in ludwig_model1.model.ecd.output_features: + of1 = ludwig_model1.model.ecd.output_features[of_name] + of2 = ludwig_model2.model.ecd.output_features[of_name] + for of1_w, of2_w in zip(of1.decoder_obj.weights, + of2.decoder_obj.weights): + assert np.allclose(of1_w.numpy(), of2_w.numpy())