From 2eaffcd8de2e1fc7f7c0ae24f3bebb8c1bb94375 Mon Sep 17 00:00:00 2001 From: Kabir Brar Date: Mon, 25 Oct 2021 15:03:23 -0400 Subject: [PATCH] Add combiner schema validation (#1347) --- ludwig/combiners/combiners.py | 545 ++++++++++++---------- ludwig/models/ecd.py | 8 +- ludwig/utils/schema.py | 29 +- ludwig/utils/schema_utils.py | 137 ++++++ requirements.txt | 3 + tests/integration_tests/test_combiners.py | 49 +- tests/ludwig/utils/test_schema.py | 118 ++++- 7 files changed, 635 insertions(+), 254 deletions(-) create mode 100644 ludwig/utils/schema_utils.py diff --git a/ludwig/combiners/combiners.py b/ludwig/combiners/combiners.py index 5f6809a62d9..ee6efb457f7 100644 --- a/ludwig/combiners/combiners.py +++ b/ludwig/combiners/combiners.py @@ -15,13 +15,17 @@ # limitations under the License. # ============================================================================== import logging -from typing import List +from typing import List, Dict, Optional, Union, Any + +from marshmallow import INCLUDE +from marshmallow_dataclass import dataclass import tensorflow as tf from tensorflow.keras.layers import LayerNormalization from tensorflow.keras.layers import Dense from tensorflow.keras.layers import concatenate +import ludwig.utils.schema_utils as schema from ludwig.constants import NUMERICAL, BINARY, TYPE, NAME from ludwig.encoders.sequence_encoders import ParallelCNN from ludwig.encoders.sequence_encoders import StackedCNN @@ -39,64 +43,81 @@ logger = logging.getLogger(__name__) +sequence_encoder_registry = { + 'stacked_cnn': StackedCNN, + 'parallel_cnn': ParallelCNN, + 'stacked_parallel_cnn': StackedParallelCNN, + 'rnn': StackedRNN, + 'cnnrnn': StackedCNNRNN, + # todo: add transformer + # 'transformer': StackedTransformer, +} + + +@dataclass +class ConcatCombinerConfig: + fc_layers: Optional[List[Dict[str, Any]]] = schema.DictList() + num_fc_layers: int = schema.NonNegativeInteger(default=0) + fc_size: int = schema.PositiveInteger(default=256) + use_bias: bool = True + weights_initializer: str = schema.InitializerOptions(default='glorot_uniform') + bias_initializer: str = schema.InitializerOptions(default='zeros') + weights_regularizer: Optional[str] = schema.RegularizerOptions() + bias_regularizer: Optional[str] = schema.RegularizerOptions() + activity_regularizer: Optional[str] = schema.RegularizerOptions() + norm: Optional[str] = schema.StringOptions(['batch', 'layer']) + norm_params: Optional[dict] = schema.Dict() + activation: str = 'relu' + dropout: float = schema.FloatRange(default=0.0, min=0, max=1) + flatten_inputs: bool = False + residual: bool = False + + class Meta: + unknown = INCLUDE + + class ConcatCombiner(tf.keras.Model): def __init__( self, - input_features=None, - fc_layers=None, - num_fc_layers=None, - fc_size=256, - use_bias=True, - weights_initializer='glorot_uniform', - bias_initializer='zeros', - weights_regularizer=None, - bias_regularizer=None, - activity_regularizer=None, - # weights_constraint=None, - # bias_constraint=None, - norm=None, - norm_params=None, - activation='relu', - dropout=0, - flatten_inputs=False, - residual=False, + input_features: Optional[List] = None, + config: ConcatCombinerConfig = None, **kwargs ): super().__init__() logger.debug(' {}'.format(self.name)) - self.flatten_inputs = flatten_inputs + self.flatten_inputs = config.flatten_inputs self.fc_stack = None # todo future: this may be redundant, check - if fc_layers is None and \ - num_fc_layers is not None: + if config.fc_layers is None and \ + config.num_fc_layers is not None: fc_layers = [] - for i in range(num_fc_layers): - fc_layers.append({'fc_size': fc_size}) + for i in range(config.num_fc_layers): + fc_layers.append({'fc_size': config.fc_size}) - if fc_layers is not None: + if config.fc_layers is not None: logger.debug(' FCStack') self.fc_stack = FCStack( - layers=fc_layers, - num_layers=num_fc_layers, - default_fc_size=fc_size, - default_use_bias=use_bias, - default_weights_initializer=weights_initializer, - default_bias_initializer=bias_initializer, - default_weights_regularizer=weights_regularizer, - default_bias_regularizer=bias_regularizer, - default_activity_regularizer=activity_regularizer, + layers=config.fc_layers, + num_layers=config.num_fc_layers, + default_fc_size=config.fc_size, + default_use_bias=config.use_bias, + default_weights_initializer=config.weights_initializer, + default_bias_initializer=config.bias_initializer, + default_weights_regularizer=config.weights_regularizer, + default_bias_regularizer=config.bias_regularizer, + default_activity_regularizer=config.activity_regularizer, # default_weights_constraint=weights_constraint, # default_bias_constraint=bias_constraint, - default_norm=norm, - default_norm_params=norm_params, - default_activation=activation, - default_dropout=dropout, - residual=residual, + default_norm=config.norm, + default_norm_params=config.norm_params, + default_activation=config.activation, + default_dropout=config.dropout, + residual=config.residual, ) - if input_features and len(input_features) == 1 and fc_layers is None: + if input_features and len(input_features) == 1 and config.fc_layers is None: self.supports_masking = True def call( @@ -138,22 +159,34 @@ def call( return return_data + @staticmethod + def get_schema_cls(): + return ConcatCombinerConfig + + +@dataclass +class SequenceConcatCombinerConfig: + main_sequence_feature: Optional[str] = None + reduce_output: Optional[str] = schema.ReductionOptions() + + class Meta: + unknown = INCLUDE + class SequenceConcatCombiner(tf.keras.Model): def __init__( self, - reduce_output=None, - main_sequence_feature=None, + config: SequenceConcatCombinerConfig = None, **kwargs ): super().__init__() logger.debug(' {}'.format(self.name)) - self.reduce_output = reduce_output - self.reduce_sequence = SequenceReducer(reduce_mode=reduce_output) + self.reduce_output = config.reduce_output + self.reduce_sequence = SequenceReducer(reduce_mode=config.reduce_output) if self.reduce_output is None: self.supports_masking = True - self.main_sequence_feature = main_sequence_feature + self.main_sequence_feature = config.main_sequence_feature def __call__( self, @@ -277,27 +310,35 @@ def __call__( return return_data + @staticmethod + def get_schema_cls(): + return SequenceConcatCombinerConfig + + +@dataclass +class SequenceCombinerConfig: + main_sequence_feature: Optional[str] = None + reduce_output: Optional[str] = schema.ReductionOptions() + encoder: Optional[str] = schema.StringOptions(list(sequence_encoder_registry.keys())) + + class Meta: + unknown = INCLUDE + class SequenceCombiner(tf.keras.Model): def __init__( self, - reduce_output=None, - main_sequence_feature=None, - encoder=None, + config: SequenceCombinerConfig = None, **kwargs ): super().__init__() logger.debug(' {}'.format(self.name)) - - self.combiner = SequenceConcatCombiner( - reduce_output=None, - main_sequence_feature=main_sequence_feature - ) + self.combiner = SequenceConcatCombiner(config) self.encoder_obj = get_from_registry( - encoder, sequence_encoder_registry)( + config.encoder, sequence_encoder_registry)( should_embed=False, - reduce_output=reduce_output, + reduce_output=config.reduce_output, **kwargs ) @@ -332,42 +373,54 @@ def __call__( return_data[key] = value return return_data + + @staticmethod + def get_schema_cls(): + return SequenceCombinerConfig + + +@dataclass +class TabNetCombinerConfig: + size: int = schema.PositiveInteger(default=32) + output_size: int = schema.PositiveInteger(default=32) + num_steps: int = schema.NonNegativeInteger(default=1) + num_total_blocks: int = schema.NonNegativeInteger(default=4) + num_shared_blocks: int = schema.NonNegativeInteger(default=2) + relaxation_factor: float = 1.5 + bn_epsilon: float = 1e-3 + bn_momentum: float = 0.7 + bn_virtual_bs: Optional[int] = schema.PositiveInteger() + sparsity: float = 1e-5 + dropout: float = schema.FloatRange(default=0.0, min=0, max=1) + + class Meta: + unknown = INCLUDE class TabNetCombiner(tf.keras.Model): def __init__( self, - size: int = 32, # N_a in the paper - output_size: int = 32, # N_d in the paper - num_steps: int = 1, # N_steps in the paper - num_total_blocks: int = 4, - num_shared_blocks: int = 2, - relaxation_factor: float = 1.5, # gamma in the paper - bn_epsilon: float = 1e-3, - bn_momentum: float = 0.7, # m_B in the paper - bn_virtual_bs: int = None, # B_v from the paper - sparsity: float = 1e-5, # lambda_sparse in the paper - dropout=0, + config: TabNetCombinerConfig = None, **kwargs ): super().__init__() logger.debug(' {}'.format(self.name)) self.tabnet = TabNet( - size=size, - output_size=output_size, - num_steps=num_steps, - num_total_blocks=num_total_blocks, - num_shared_blocks=num_shared_blocks, - relaxation_factor=relaxation_factor, - bn_epsilon=bn_epsilon, - bn_momentum=bn_momentum, - bn_virtual_bs=bn_virtual_bs, - sparsity=sparsity + size=config.size, + output_size=config.output_size, + num_steps=config.num_steps, + num_total_blocks=config.num_total_blocks, + num_shared_blocks=config.num_shared_blocks, + relaxation_factor=config.relaxation_factor, + bn_epsilon=config.bn_epsilon, + bn_momentum=config.bn_momentum, + bn_virtual_bs=config.bn_virtual_bs, + sparsity=config.sparsity ) - if dropout > 0: - self.dropout = tf.keras.layers.Dropout(dropout) + if config.dropout > 0: + self.dropout = tf.keras.layers.Dropout(config.dropout) else: self.dropout = None @@ -419,74 +472,84 @@ def call( return return_data + @staticmethod + def get_schema_cls(): + return TabNetCombinerConfig + + +@dataclass +class TransformerCombinerConfig: + num_layers: int = schema.PositiveInteger(default=1) + hidden_size: int = schema.NonNegativeInteger(default=256) + num_heads: int = schema.NonNegativeInteger(default=8) + transformer_fc_size: int = schema.NonNegativeInteger(default=256) + dropout: float = schema.FloatRange(default=0.1, min=0, max=1) + fc_layers: Optional[List[Dict[str, Any]]] = schema.DictList() + num_fc_layers: int = schema.NonNegativeInteger(default=0) + fc_size: int = schema.PositiveInteger(default=256) + use_bias: bool = True + weights_initializer: str = schema.InitializerOptions(default='glorot_uniform') + bias_initializer: str = schema.InitializerOptions(default='zeros') + weights_regularizer: Optional[str] = schema.RegularizerOptions() + bias_regularizer: Optional[str] = schema.RegularizerOptions() + activity_regularizer: Optional[str] = schema.RegularizerOptions() + norm: Optional[str] = schema.StringOptions(['batch', 'layer']) + norm_params: Optional[dict] = schema.Dict() + fc_activation: str = 'relu' + fc_dropout: float = schema.FloatRange(default=0.0, min=0, max=1) + fc_residual: bool = False + reduce_output: Optional[str] = schema.ReductionOptions(default='mean') + + class Meta: + unknown = INCLUDE + class TransformerCombiner(tf.keras.Model): def __init__( self, - input_features=None, - num_layers=1, - hidden_size=256, - num_heads=8, - transformer_fc_size=256, - dropout=0.1, - fc_layers=None, - num_fc_layers=0, - fc_size=256, - use_bias=True, - weights_initializer='glorot_uniform', - bias_initializer='zeros', - weights_regularizer=None, - bias_regularizer=None, - activity_regularizer=None, - # weights_constraint=None, - # bias_constraint=None, - norm=None, - norm_params=None, - fc_activation='relu', - fc_dropout=0, - fc_residual=False, - reduce_output='mean', + input_features: Optional[List] = None, + config: TransformerCombinerConfig = None, **kwargs ): super().__init__() logger.debug(' {}'.format(self.name)) - self.reduce_output = reduce_output - self.reduce_sequence = SequenceReducer(reduce_mode=reduce_output) + self.reduce_output = config.reduce_output + self.reduce_sequence = SequenceReducer(reduce_mode=config.reduce_output) if self.reduce_output is None: self.supports_masking = True logger.debug(' Projectors') - self.projectors = [Dense(hidden_size) for _ in input_features] + self.projectors = [Dense(config.hidden_size) for _ in input_features] logger.debug(' TransformerStack') self.transformer_stack = TransformerStack( - hidden_size=hidden_size, - num_heads=num_heads, - fc_size=transformer_fc_size, - num_layers=num_layers, - dropout=dropout + hidden_size=config.hidden_size, + num_heads=config.num_heads, + fc_size=config.transformer_fc_size, + num_layers=config.num_layers, + dropout=config.dropout ) if self.reduce_output is not None: logger.debug(' FCStack') self.fc_stack = FCStack( - layers=fc_layers, - num_layers=num_fc_layers, - default_fc_size=fc_size, - default_use_bias=use_bias, - default_weights_initializer=weights_initializer, - default_bias_initializer=bias_initializer, - default_weights_regularizer=weights_regularizer, - default_bias_regularizer=bias_regularizer, - default_activity_regularizer=activity_regularizer, + layers=config.fc_layers, + num_layers=config.num_fc_layers, + default_fc_size=config.fc_size, + default_use_bias=config.use_bias, + default_weights_initializer=config.weights_initializer, + default_bias_initializer=config.bias_initializer, + default_weights_regularizer=config.weights_regularizer, + default_bias_regularizer=config.bias_regularizer, + default_activity_regularizer=config.activity_regularizer, # default_weights_constraint=weights_constraint, # default_bias_constraint=bias_constraint, - default_norm=norm, - default_norm_params=norm_params, - default_activation=fc_activation, - default_dropout=fc_dropout, - fc_residual=fc_residual, + default_norm=config.norm, + default_norm_params=config.norm_params, + default_activation=config.fc_activation, + default_dropout=config.fc_dropout, + fc_residual=config.fc_residual, ) def call( @@ -539,71 +602,81 @@ def call( return return_data + @staticmethod + def get_schema_cls(): + return TransformerCombinerConfig + + +@dataclass +class TabTransformerCombinerConfig: + embed_input_feature_name: Optional[Union[str, int]] = schema.Embed() + num_layers: int = schema.PositiveInteger(default=1) + hidden_size: int = schema.NonNegativeInteger(default=256) + num_heads: int = schema.NonNegativeInteger(default=8) + transformer_fc_size: int = schema.NonNegativeInteger(default=256) + dropout: float = schema.FloatRange(default=0.1, min=0, max=1) + fc_layers: Optional[List[Dict[str, Any]]] = schema.DictList() + num_fc_layers: int = schema.NonNegativeInteger(default=0) + fc_size: int = schema.PositiveInteger(default=256) + use_bias: bool = True + weights_initializer: str = schema.InitializerOptions(default='glorot_uniform') + bias_initializer: str = schema.InitializerOptions(default='zeros') + weights_regularizer: Optional[str] = schema.RegularizerOptions() + bias_regularizer: Optional[str] = schema.RegularizerOptions() + activity_regularizer: Optional[str] = schema.RegularizerOptions() + norm: Optional[str] = schema.StringOptions(['batch', 'layer']) + norm_params: Optional[dict] = schema.Dict() + fc_activation: str = 'relu' + fc_dropout: float = schema.FloatRange(default=0.0, min=0, max=1) + fc_residual: bool = False + reduce_output: str = schema.ReductionOptions(default='concat') + + class Meta: + unknown = INCLUDE + class TabTransformerCombiner(tf.keras.Model): def __init__( self, - input_features=None, - embed_input_feature_name=None, # None or embedding size or "add" - num_layers=1, - hidden_size=256, - num_heads=8, - transformer_fc_size=256, - dropout=0.1, - fc_layers=None, - num_fc_layers=0, - fc_size=256, - use_bias=True, - weights_initializer='glorot_uniform', - bias_initializer='zeros', - weights_regularizer=None, - bias_regularizer=None, - activity_regularizer=None, - # weights_constraint=None, - # bias_constraint=None, - norm=None, - norm_params=None, - fc_activation='relu', - fc_dropout=0, - fc_residual=False, - reduce_output='concat', + input_features: Optional[List] = None, + config: TabTransformerCombinerConfig = None, **kwargs ): super().__init__() logger.debug(' {}'.format(self.name)) - if reduce_output is None: + if config.reduce_output is None: raise ValueError("TabTransformer requires the `resude_output` " "parametr") - self.reduce_output = reduce_output - self.reduce_sequence = SequenceReducer(reduce_mode=reduce_output) + self.reduce_output = config.reduce_output + self.reduce_sequence = SequenceReducer(reduce_mode=config.reduce_output) self.supports_masking = True self.layer_norm = LayerNormalization() - self.embed_input_feature_name = embed_input_feature_name + self.embed_input_feature_name = config.embed_input_feature_name if self.embed_input_feature_name: vocab = [i_f for i_f in input_features if i_f[TYPE] != NUMERICAL or i_f[TYPE] != BINARY] if self.embed_input_feature_name == 'add': - self.embed_i_f_name_layer = Embed(vocab, hidden_size, + self.embed_i_f_name_layer = Embed(vocab, config.hidden_size, force_embedding_size=True) - projector_size = hidden_size + projector_size = config.hidden_size elif isinstance(self.embed_input_feature_name, int): - if self.embed_input_feature_name > hidden_size: + if self.embed_input_feature_name > config.hidden_size: raise ValueError( "TabTransformer parameter " "`embed_input_feature_name` " "specified integer value ({}) " "needs to be smaller than " "`hidden_size` ({}).".format( - self.embed_input_feature_name, hidden_size + self.embed_input_feature_name, config.hidden_size )) self.embed_i_f_name_layer = Embed( vocab, self.embed_input_feature_name, force_embedding_size=True, ) - projector_size = hidden_size - self.embed_input_feature_name + projector_size = config.hidden_size - self.embed_input_feature_name else: raise ValueError("TabTransformer parameter " "`embed_input_feature_name` " @@ -611,7 +684,7 @@ def __init__( "the current value is " "{}".format(self.embed_input_feature_name)) else: - projector_size = hidden_size + projector_size = config.hidden_size logger.debug(' Projectors') self.projectors = [Dense(projector_size) for i_f in input_features @@ -621,31 +694,32 @@ def __init__( logger.debug(' TransformerStack') self.transformer_stack = TransformerStack( - hidden_size=hidden_size, - num_heads=num_heads, - fc_size=transformer_fc_size, - num_layers=num_layers, - dropout=dropout + hidden_size=config.hidden_size, + num_heads=config.num_heads, + fc_size=config.transformer_fc_size, + num_layers=config.num_layers, + dropout=config.dropout ) logger.debug(' FCStack') + self.fc_stack = FCStack( - layers=fc_layers, - num_layers=num_fc_layers, - default_fc_size=fc_size, - default_use_bias=use_bias, - default_weights_initializer=weights_initializer, - default_bias_initializer=bias_initializer, - default_weights_regularizer=weights_regularizer, - default_bias_regularizer=bias_regularizer, - default_activity_regularizer=activity_regularizer, + layers=config.fc_layers, + num_layers=config.num_fc_layers, + default_fc_size=config.fc_size, + default_use_bias=config.use_bias, + default_weights_initializer=config.weights_initializer, + default_bias_initializer=config.bias_initializer, + default_weights_regularizer=config.weights_regularizer, + default_bias_regularizer=config.bias_regularizer, + default_activity_regularizer=config.activity_regularizer, # default_weights_constraint=weights_constraint, # default_bias_constraint=bias_constraint, - default_norm=norm, - default_norm_params=norm_params, - default_activation=fc_activation, - default_dropout=fc_dropout, - fc_residual=fc_residual, + default_norm=config.norm, + default_norm_params=config.norm_params, + default_activation=config.fc_activation, + default_dropout=config.fc_dropout, + fc_residual=config.fc_residual, ) def call( @@ -722,28 +796,37 @@ def call( return_data[key] = value return return_data + + @staticmethod + def get_schema_cls(): + return TabTransformerCombinerConfig + + +@dataclass +class ComparatorCombinerConfig: + entity_1: List[str] + entity_2: List[str] + num_fc_layers: int = schema.NonNegativeInteger(default=1) + fc_size: int = schema.PositiveInteger(default=256) + use_bias: bool = True + weights_initializer: str = schema.InitializerOptions(default='glorot_uniform') + bias_initializer: str = schema.InitializerOptions(default='zeros') + weights_regularizer: Optional[str] = schema.RegularizerOptions() + bias_regularizer: Optional[str] = schema.RegularizerOptions() + activity_regularizer: Optional[str] = schema.RegularizerOptions() + norm: Optional[str] = schema.StringOptions(['batch', 'layer']) + norm_params: Optional[dict] = schema.Dict() + activation: str = 'relu' + dropout: float = schema.FloatRange(default=0.0, min=0, max=1) + + class Meta: + unknown = INCLUDE class ComparatorCombiner(tf.keras.Model): def __init__( self, - entity_1: List[str], - entity_2: List[str], - # fc_layers=None, - num_fc_layers=1, - fc_size=256, - use_bias=True, - weights_initializer="glorot_uniform", - bias_initializer="zeros", - weights_regularizer=None, - bias_regularizer=None, - activity_regularizer=None, - # weights_constraint=None, - # bias_constraint=None, - norm=None, - norm_params=None, - activation="relu", - dropout=0, + config: ComparatorCombinerConfig = None, **kwargs, ): super().__init__() @@ -754,56 +837,56 @@ def __init__( # todo future: this may be redundant, check # if fc_layers is None and num_fc_layers is not None: fc_layers = [] - for i in range(num_fc_layers): - fc_layers.append({"fc_size": fc_size}) + for i in range(config.num_fc_layers): + fc_layers.append({"fc_size": config.fc_size}) if fc_layers is not None: logger.debug(" FCStack") self.e1_fc_stack = FCStack( layers=fc_layers, - num_layers=num_fc_layers, - default_fc_size=fc_size, - default_use_bias=use_bias, - default_weights_initializer=weights_initializer, - default_bias_initializer=bias_initializer, - default_weights_regularizer=weights_regularizer, - default_bias_regularizer=bias_regularizer, - default_activity_regularizer=activity_regularizer, + num_layers=config.num_fc_layers, + default_fc_size=config.fc_size, + default_use_bias=config.use_bias, + default_weights_initializer=config.weights_initializer, + default_bias_initializer=config.bias_initializer, + default_weights_regularizer=config.weights_regularizer, + default_bias_regularizer=config.bias_regularizer, + default_activity_regularizer=config.activity_regularizer, # default_weights_constraint=weights_constraint, # default_bias_constraint=bias_constraint, - default_norm=norm, - default_norm_params=norm_params, - default_activation=activation, - default_dropout=dropout, + default_norm=config.norm, + default_norm_params=config.norm_params, + default_activation=config.activation, + default_dropout=config.dropout, ) self.e2_fc_stack = FCStack( layers=fc_layers, - num_layers=num_fc_layers, - default_fc_size=fc_size, - default_use_bias=use_bias, - default_weights_initializer=weights_initializer, - default_bias_initializer=bias_initializer, - default_weights_regularizer=weights_regularizer, - default_bias_regularizer=bias_regularizer, - default_activity_regularizer=activity_regularizer, + num_layers=config.num_fc_layers, + default_fc_size=config.fc_size, + default_use_bias=config.use_bias, + default_weights_initializer=config.weights_initializer, + default_bias_initializer=config.bias_initializer, + default_weights_regularizer=config.weights_regularizer, + default_bias_regularizer=config.bias_regularizer, + default_activity_regularizer=config.activity_regularizer, # default_weights_constraint=weights_constraint, # default_bias_constraint=bias_constraint, - default_norm=norm, - default_norm_params=norm_params, - default_activation=activation, - default_dropout=dropout, + default_norm=config.norm, + default_norm_params=config.norm_params, + default_activation=config.activation, + default_dropout=config.dropout, ) # todo: this should actually be the size of the last fc layer, # not just fc_size # todo: set initializer and regularization - self.bilinear_weights = tf.random.normal([fc_size, fc_size], + self.bilinear_weights = tf.random.normal([config.fc_size, config.fc_size], dtype=tf.float32) - self.entity_1 = entity_1 - self.entity_2 = entity_2 - self.required_inputs = set(entity_1 + entity_2) - self.fc_size = fc_size + self.entity_1 = config.entity_1 + self.entity_2 = config.entity_2 + self.required_inputs = set(config.entity_1 + config.entity_2) + self.fc_size = config.fc_size def call(self, inputs, training=None, mask=None, **kwargs): # encoder outputs @@ -874,6 +957,10 @@ def call(self, inputs, training=None, mask=None, return {"combiner_output": hidden} + @staticmethod + def get_schema_cls(): + return ComparatorCombinerConfig + def get_combiner_class(combiner_type): return get_from_registry( @@ -891,13 +978,3 @@ def get_combiner_class(combiner_type): "transformer": TransformerCombiner, "tabtransformer": TabTransformerCombiner, } - -sequence_encoder_registry = { - 'stacked_cnn': StackedCNN, - 'parallel_cnn': ParallelCNN, - 'stacked_parallel_cnn': StackedParallelCNN, - 'rnn': StackedRNN, - 'cnnrnn': StackedCNNRNN, - # todo: add transformer - # 'transformer': StackedTransformer, -} diff --git a/ludwig/models/ecd.py b/ludwig/models/ecd.py index 4a6b5b96705..02c6b6517aa 100644 --- a/ludwig/models/ecd.py +++ b/ludwig/models/ecd.py @@ -11,6 +11,7 @@ from ludwig.utils.algorithms_utils import topological_sort_feature_dependencies from ludwig.utils.data_utils import clear_data_cache from ludwig.utils.misc_utils import get_from_registry +from ludwig.utils.schema_utils import load_config_with_kwargs logger = logging.getLogger(__name__) @@ -45,9 +46,14 @@ def __init__( # ================ Combiner ================ logger.debug('Combiner {}'.format(combiner_def[TYPE])) combiner_class = get_combiner_class(combiner_def[TYPE]) + config, kwargs = load_config_with_kwargs( + combiner_class.get_schema_cls(), + combiner_def, + ) self.combiner = combiner_class( input_features=self.input_features, - **combiner_def, + config=config, + **kwargs ) # ================ Outputs ================ diff --git a/ludwig/utils/schema.py b/ludwig/utils/schema.py index ba770030edb..bc8df168132 100644 --- a/ludwig/utils/schema.py +++ b/ludwig/utils/schema.py @@ -14,8 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== +import json +import marshmallow_dataclass from jsonschema import validate +from marshmallow_jsonschema import JSONSchema from ludwig.combiners.combiners import combiner_registry from ludwig.features.feature_registries import input_type_registry, output_type_registry @@ -24,7 +27,6 @@ OUTPUT_FEATURE_TYPES = sorted(list(output_type_registry.keys())) COMBINER_TYPES = sorted(list(combiner_registry.keys())) - def get_schema(): schema = { 'type': 'object', @@ -62,11 +64,14 @@ def get_schema(): 'properties': { 'type': {'type': 'string', 'enum': COMBINER_TYPES}, }, + 'allOf': get_combiner_conds(), + 'required': ['type'], }, 'training': {}, 'preprocessing': {}, 'hyperopt': {}, }, + 'definitions': get_custom_definitions(), 'required': ['input_features', 'output_features'] } return schema @@ -132,6 +137,26 @@ def get_output_preproc_conds(): return conds +def get_combiner_conds(): + conds = [] + for combiner_type in COMBINER_TYPES: + combiner_cls = combiner_registry[combiner_type] + schema_cls = combiner_cls.get_schema_cls() + schema = marshmallow_dataclass.class_schema(schema_cls)() + schema_json = JSONSchema().dump(schema) + combiner_json = schema_json['definitions'][schema_cls.__name__]['properties'] + + # TODO: add type to schema: https://github.com/lovasoa/marshmallow_dataclass/issues/62 + combiner_cond = create_cond( + {'type': combiner_type}, + combiner_json + ) + conds.append(combiner_cond) + return conds + +def get_custom_definitions(): + return {} + def create_cond(if_pred, then_pred): return { 'if': { @@ -144,4 +169,4 @@ def create_cond(if_pred, then_pred): def validate_config(config): - validate(instance=config, schema=get_schema()) + validate(instance=config, schema=get_schema()) \ No newline at end of file diff --git a/ludwig/utils/schema_utils.py b/ludwig/utils/schema_utils.py new file mode 100644 index 00000000000..d44aa75805f --- /dev/null +++ b/ludwig/utils/schema_utils.py @@ -0,0 +1,137 @@ +from dataclasses import field + +import marshmallow_dataclass +from marshmallow import fields, validate, ValidationError + +from ludwig.modules.initializer_modules import initializers_registry +from ludwig.modules.reduction_modules import reduce_mode_registry + + +def InitializerOptions(default=None): + return StringOptions( + list(initializers_registry.keys()), + default=default, + nullable=True + ) + + +def ReductionOptions(default=None): + return StringOptions( + list(reduce_mode_registry.keys()), + default=default, + nullable=True, + ) + + +def RegularizerOptions(nullable=True): + return StringOptions(['l1', 'l2', 'l1_l2'], nullable=nullable) + + +def StringOptions(options, default=None, nullable=True): + return field(metadata={ + 'marshmallow_field': fields.String( + validate=validate.OneOf(options), + allow_none=nullable, + ) + }, default=default) + + +def PositiveInteger(default=None): + return field(metadata={ + 'marshmallow_field': fields.Integer( + validate=validate.Range(min=1), + allow_none=default is None, + ) + }, default=default) + + +def NonNegativeInteger(default=None): + return field(metadata={ + 'marshmallow_field': fields.Integer( + validate=validate.Range(min=0), + allow_none=True, + ) + }, default=default) + + +def FloatRange(default=None, **kwargs): + return field(metadata={ + 'marshmallow_field': fields.Float( + validate=validate.Range(**kwargs), + allow_none=default is None, + ) + }, default=default) + + +def DictList(): + return field(metadata={ + 'marshmallow_field': fields.List( + fields.Dict(fields.String()), + allow_none=True, + ) + }, default=None) + + +def Dict(): + return field(metadata={ + 'marshmallow_field': fields.Dict( + fields.String(), + allow_none=True, + ) + }, default=None) + + +def Embed(): + return field(metadata={ + 'marshmallow_field': EmbedInputFeatureNameField(allow_none=True) + }, default=None) + + +_embed_options = ['add'] + + +class EmbedInputFeatureNameField(fields.Field): + def _deserialize(self, value, attr, data, **kwargs): + if value is None: + return value + + if isinstance(value, str): + if value not in _embed_options: + raise ValidationError( + f"Expected one of: {_embed_options}, found: {value}" + ) + return value + + if isinstance(value, int): + return value + + raise ValidationError('Field should be int or str') + + def _jsonschema_type_mapping(self): + return { + 'oneOf': [ + {'type': 'string', 'enum': _embed_options}, + {'type': 'integer'}, + {'type': 'null'} + ] + } + + +def load_config(cls, **kwargs): + schema = marshmallow_dataclass.class_schema(cls)() + return schema.load(kwargs) + + +def load_config_with_kwargs(cls, kwargs): + schema = marshmallow_dataclass.class_schema(cls)() + fields = schema.fields.keys() + return load_config( + cls, + **{ + k: v for k, v in kwargs.items() + if k in fields + } + ), { + k: v for k, v in kwargs.items() + if k not in fields + } diff --git a/requirements.txt b/requirements.txt index 7dbd4eb412c..0a2d32265a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,9 @@ tables fsspec dataclasses-json jsonschema +marshmallow +marshmallow-jsonschema +marshmallow-dataclass filelock # new data format support diff --git a/tests/integration_tests/test_combiners.py b/tests/integration_tests/test_combiners.py index b87b621141c..8e1bd79565d 100644 --- a/tests/integration_tests/test_combiners.py +++ b/tests/integration_tests/test_combiners.py @@ -4,15 +4,23 @@ import tensorflow as tf from ludwig.combiners.combiners import ( + ComparatorCombinerConfig, ConcatCombiner, + ConcatCombinerConfig, + SequenceCombinerConfig, SequenceConcatCombiner, SequenceCombiner, + SequenceConcatCombinerConfig, TabNetCombiner, ComparatorCombiner, + TabNetCombinerConfig, + TabTransformerCombinerConfig, TransformerCombiner, TabTransformerCombiner, + TransformerCombinerConfig, sequence_encoder_registry, ) +from ludwig.utils.schema_utils import load_config logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -121,7 +129,7 @@ def test_concat_combiner(encoder_outputs, fc_layer): del encoder_outputs["feature_4"] # setup combiner to test - combiner = ConcatCombiner(fc_layers=fc_layer) + combiner = ConcatCombiner(config=load_config(ConcatCombinerConfig, fc_layers=fc_layer)) # concatenate encoder outputs results = combiner(encoder_outputs) @@ -149,10 +157,11 @@ def test_concat_combiner(encoder_outputs, fc_layer): def test_sequence_concat_combiner( encoder_outputs, main_sequence_feature, reduce_output ): - combiner = SequenceConcatCombiner( + combiner = SequenceConcatCombiner(config=load_config( + SequenceConcatCombinerConfig, main_sequence_feature=main_sequence_feature, reduce_output=reduce_output - ) + )) # calculate expected hidden size for concatenated tensors hidden_size = 0 @@ -184,11 +193,12 @@ def test_sequence_concat_combiner( def test_sequence_combiner( encoder_outputs, main_sequence_feature, encoder, reduce_output ): - combiner = SequenceCombiner( + combiner = SequenceCombiner(config=load_config( + SequenceCombinerConfig, main_sequence_feature=main_sequence_feature, encoder=encoder, reduce_output=reduce_output, - ) + )) # calculate expected hidden size for concatenated tensors hidden_size = 0 @@ -263,14 +273,15 @@ def test_tabnet_combiner(encoder_outputs_key): encoder_outputs = tabnet_encoder_outputs()[encoder_outputs_key] # setup combiner to test - combiner = TabNetCombiner( + combiner = TabNetCombiner(config=load_config( + TabNetCombinerConfig, size=2, output_size=2, num_steps=3, num_total_blocks=4, num_shared_blocks=2, dropout=0.1 - ) + )) # concatenate encoder outputs results = combiner(encoder_outputs) @@ -295,7 +306,13 @@ def test_comparator_combiner(encoder_comparator_outputs, fc_layer, entity_1, # setup combiner to test set to 256 for case when none as it's the default size fc_size = fc_layer[0]["fc_size"] if fc_layer else 256 combiner = ComparatorCombiner( - entity_1, entity_2, fc_layers=fc_layer, fc_size=fc_size + config=load_config( + ComparatorCombinerConfig, + entity_1=entity_1, + entity_2=entity_2, + # fc_layers=fc_layer, + fc_size=fc_size + ) ) # concatenate encoder outputs @@ -336,7 +353,8 @@ def test_transformer_combiner(encoder_outputs): # setup combiner to test combiner = TransformerCombiner( - input_features=input_features_def + input_features=input_features_def, + config=load_config(TransformerCombinerConfig) ) # concatenate encoder outputs @@ -369,7 +387,8 @@ def test_tabtransformer_combiner(encoder_outputs): # setup combiner to test combiner = TabTransformerCombiner( - input_features=input_features_def + input_features=input_features_def, + config=load_config(TabTransformerCombinerConfig) ) # concatenate encoder outputs @@ -381,7 +400,10 @@ def test_tabtransformer_combiner(encoder_outputs): # setup combiner to test combiner = TabTransformerCombiner( input_features=input_features_def, - embed_input_feature_name=56 + config=load_config( + TabTransformerCombinerConfig, + embed_input_feature_name=56 + ) ) # concatenate encoder outputs @@ -393,7 +415,10 @@ def test_tabtransformer_combiner(encoder_outputs): # setup combiner to test combiner = TabTransformerCombiner( input_features=input_features_def, - embed_input_feature_name='add' + config=load_config( + TabTransformerCombinerConfig, + embed_input_feature_name='add' + ) ) # concatenate encoder outputs diff --git a/tests/ludwig/utils/test_schema.py b/tests/ludwig/utils/test_schema.py index 38861973121..51a3f0b7ab9 100644 --- a/tests/ludwig/utils/test_schema.py +++ b/tests/ludwig/utils/test_schema.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== +import json import pytest from jsonschema.exceptions import ValidationError @@ -33,6 +34,7 @@ from ludwig.features.vector_feature import VectorFeatureMixin from ludwig.utils.defaults import merge_with_defaults +from ludwig.utils.defaults import merge_with_defaults from ludwig.utils.schema import validate_config, OUTPUT_FEATURE_TYPES from tests.integration_tests.utils import ENCODERS, numerical_feature, \ @@ -95,7 +97,6 @@ def test_config_features(): with pytest.raises(ValidationError, match=rf"^'{dtype}' is not one of .*"): validate_config(config) - def test_config_encoders(): for encoder in ENCODERS: config = { @@ -192,7 +193,6 @@ def test_config_bad_preprocessing_param(): with pytest.raises(ValidationError, match=r"^'fake' is not one of .*"): validate_config(config) - def test_config_bad_combiner(): config = { 'input_features': [ @@ -201,23 +201,131 @@ def test_config_bad_combiner(): ], 'output_features': [binary_feature(weight_regularization=None)], 'combiner': { - 'type': 'tabnet' + 'type': 'tabnet', } } # config is valid at this point validate_config(config) - # bad combiner + # combiner without type + del config['combiner']['type'] + with pytest.raises(ValidationError, match=r"^'type' is a required .*"): + validate_config(config) + + # bad combiner type config['combiner']['type'] = 'fake' with pytest.raises(ValidationError, match=r"^'fake' is not one of .*"): validate_config(config) # bad combiner format (list instead of dict) config['combiner'] = [{'type': 'tabnet'}] - with pytest.raises(ValidationError): + with pytest.raises(ValidationError, match=r"^\[\{'type': 'tabnet'\}\] is not of .*"): + validate_config(config) + + # bad combiner parameter types + config['combiner'] = { + 'type': 'tabtransformer', + 'num_layers': 10, + 'dropout': False, + } + with pytest.raises(ValidationError, match=r"^False is not of type.*"): + validate_config(config) + + # bad combiner parameter range + config['combiner'] = { + 'type': 'transformer', + 'dropout': -1, + } + with pytest.raises(ValidationError, match=r"less than the minimum.*"): validate_config(config) +def test_config_bad_combiner_types_enums(): + config = { + 'input_features': [ + category_feature(vocab_size=2, reduce_input='sum'), + numerical_feature(), + ], + 'output_features': [binary_feature(weight_regularization=None)], + 'combiner': { + 'type': 'concat', + 'weights_initializer': 'zeros' + }, + } + + # config is valid at this point + validate_config(config) + + # Test weights initializer: + config['combiner']['weights_initializer'] = {'test': 'fail'} + with pytest.raises(ValidationError, match=r"{'test': 'fail'} is not of*"): + validate_config(config) + config['combiner']['weights_initializer'] = 'fail' + with pytest.raises(ValidationError, match=r"'fail' is not of*"): + validate_config(config) + + # Test bias initializer: + del config['combiner']['weights_initializer'] + config['combiner']['bias_initializer'] = 'variance_scaling' + validate_config(config) + config['combiner']['bias_initializer'] = 'fail' + with pytest.raises(ValidationError, match=r"'fail' is not of*"): + validate_config(config) + + # Test weights regularizer: + del config['combiner']['bias_initializer'] + config['combiner']['weights_regularizer'] = 'l1' + validate_config(config) + config['combiner']['weights_regularizer'] = 'fail' + with pytest.raises(ValidationError, match=r"'fail' is not one of*"): + validate_config(config) + + # Test bias regularizer: + del config['combiner']['weights_regularizer'] + config['combiner']['bias_regularizer'] = 'l1_l2' + validate_config(config) + config['combiner']['bias_regularizer'] = 'fail' + with pytest.raises(ValidationError, match=r"'fail' is not one of*"): + validate_config(config) + + # Test activity regularizer: + del config['combiner']['bias_regularizer'] + config['combiner']['activity_regularizer'] = 'l1_l2' + validate_config(config) + config['combiner']['activity_regularizer'] = 'fail' + with pytest.raises(ValidationError, match=r"'fail' is not one of*"): + validate_config(config) + + # Test norm: + del config['combiner']['activity_regularizer'] + config['combiner']['norm'] = 'batch' + validate_config(config) + config['combiner']['norm'] = 'fail' + with pytest.raises(ValidationError, match=r"'fail' is not one of*"): + validate_config(config) + + # Test activation: + del config['combiner']['norm'] + config['combiner']['activation'] = 'relu' + validate_config(config) + config['combiner']['activation'] = 123 + with pytest.raises(ValidationError, match=r"123 is not of type*"): + validate_config(config) + + # Test reduce_output: + del config['combiner']['activation'] + config2 = {**config} + config2['combiner']['type'] = 'tabtransformer' + config2['combiner']['reduce_output'] = 'sum' + validate_config(config) + config2['combiner']['reduce_output'] = 'fail' + with pytest.raises(ValidationError, match=r"'fail' is not one of*"): + validate_config(config2) + + # Test reduce_output = None: + config2['combiner']['reduce_output'] = None + validate_config(config2) + def test_config_fill_values(): vector_fill_values = [