-
Notifications
You must be signed in to change notification settings - Fork 3
/
feature_generator.py
913 lines (780 loc) · 46.9 KB
/
feature_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
import functools
import logging
import re
from abc import ABC, abstractmethod
from typing import Sequence, List, Union, Callable, Any, Dict, TYPE_CHECKING, Optional
import numpy as np
import pandas as pd
from .. import util, data_transformation
from ..data_transformation import DFTNormalisation, DFTFromFeatureGenerator, DataFrameTransformer
from ..util import flatten_arguments
from ..util.string import or_regex_group, ToStringMixin, list_string
from ..util.typing import PandasNamedTuple
if TYPE_CHECKING:
from ..vector_model import VectorModel
from ..columngen import ColumnGenerator
log = logging.getLogger(__name__)
class DuplicateColumnNamesException(Exception):
pass
class FeatureGenerator(ToStringMixin, ABC):
"""
Base class for feature generators that create a new DataFrame containing feature values
from an input DataFrame
"""
def __init__(self,
categorical_feature_names: Optional[Union[Sequence[str], str]] = None,
normalisation_rules: Sequence[data_transformation.DFTNormalisation.Rule] = (),
normalisation_rule_template: Optional[data_transformation.DFTNormalisation.RuleTemplate] = None,
add_categorical_default_rules: bool = True):
"""
:param categorical_feature_names: either a sequence of column names or a regex that is to match all categorical feature names
(which must not only work for the feature generated by this feature generator, i.e. it should not match feature names generated
by other feature generators).
It will be ensured that the respective columns in the generated data frames will have dtype 'category'.
Furthermore, the presence of meta-information can later be leveraged for further transformations, e.g., one-hot encoding.
:param normalisation_rules: Rules to be used by DFTNormalisation (e.g.,for constructing an input transformer for a model).
These rules are only relevant if a DFTNormalisation object consuming them is instantiated and used
within a data processing pipeline. They do not affect feature generation.
:param normalisation_rule_template: This parameter can be supplied instead of `normalisation_rules` for the case where
there shall be a single rule that applies to all columns generated by this feature generator that were not labeled as
categorical. Like normalisation_rules, this is only relevant if a DFTNormalisation object consuming
normalisation rules is instantiated and used within a data processing pipeline.
It does not affect feature generation.
:param add_categorical_default_rules:
If True, normalisation rules for categorical features (which are unsupported by normalisation) and their corresponding one-hot
encoded features (with "_<index>" appended) will be added. It does not affect feature generation.
"""
# NOTE: While it would be more elegant to not have all of the above constructor arguments and instead provide
# them later using "with*" methods, this would have the significant drawback that it would enable
# all such attributes to be provided in all subclasses, even in ones where we know settings exactly
# and can provide them directly in the subclass constructor implementation. Thus it would enable
# non-sensical settings which should be avoided.
if len(normalisation_rules) > 0 and normalisation_rule_template is not None:
raise ValueError(f"Normalisation rules should be empty when a rule template is provided")
self._generatedColumnNames = None
self.__categoricalFeatureNames = categorical_feature_names
if type(categorical_feature_names) == str:
categorical_feature_name_regex = categorical_feature_names
else:
if categorical_feature_names is not None and len(categorical_feature_names) > 0:
categorical_feature_name_regex = or_regex_group(categorical_feature_names)
else:
categorical_feature_name_regex = None
self._categoricalFeatureNameRegex: str = categorical_feature_name_regex
self._categoricalFeatureRules = []
if normalisation_rule_template is not None:
# Note: placeholder rule's regex will be set in generate
self._normalisationRules = [normalisation_rule_template.to_placeholder_rule()]
self._mustUpdateNormalisationRuleBasedOnColumnNames = True
else:
self._normalisationRules = list(normalisation_rules)
self._mustUpdateNormalisationRuleBasedOnColumnNames = False
if add_categorical_default_rules:
if categorical_feature_name_regex is not None:
self._categoricalFeatureRules.append(data_transformation.DFTNormalisation.Rule(categorical_feature_name_regex,
unsupported=True))
self._categoricalFeatureRules.append(data_transformation.DFTNormalisation.Rule(categorical_feature_name_regex + r"_\d+",
skip=True)) # rule for one-hot transformation
self._name: Optional[str] = None
self._isFitted = False
# for backwards compatibility with persisted Featuregens based on code prior to commit 7088cbbe
# They lack the __isFitted attribute and we assume that each such Featuregen was fitted
def __setstate__(self, d):
d["_isFitted"] = d.get("_isFitted", True)
self.__dict__ = d
def _tostring_exclude_private(self) -> bool:
return True
def _tostring_additional_entries(self) -> Dict[str, Any]:
return dict(name=self.get_name())
def get_name(self) -> str:
"""
:return: the name of this feature generator, which may be a default name if the name has not been set. Note that feature generators
created by a FeatureGeneratorFactory always get the name with which the generator factory was registered.
"""
if self._name is None:
return f"{self.__class__.__name__}-{id(self)}"
return self._name
def set_name(self, name: str) -> None:
self._name = name
def get_names(self) -> List[str]:
"""
:return: the list of names of feature generators; will be a list with a single name for a regular feature generator
"""
return [self.get_name()]
def info(self):
return {
"name": self.get_name(),
"categoricalFeatureNames": self.__categoricalFeatureNames,
"generatedColumnNames": self.get_generated_column_names(),
"isFitted": self.is_fitted(),
"normalisationRules": self.get_normalisation_rules(),
}
def get_normalisation_rules(self, include_generated_categorical_rules=True) -> List[data_transformation.DFTNormalisation.Rule]:
if include_generated_categorical_rules:
return self._normalisationRules + self._categoricalFeatureRules
else:
return self._normalisationRules
def get_categorical_feature_name_regex(self) -> Optional[str]:
return self._categoricalFeatureNameRegex
def is_categorical_feature(self, feature_name):
if self._categoricalFeatureNameRegex is None:
return False
return re.fullmatch(self._categoricalFeatureNameRegex, feature_name) is not None
def get_generated_column_names(self) -> Optional[List[str]]:
"""
:return: Column names of the data frame generated by the most recent call of the feature generators 'generate' method.
Returns None if generate was never called.
"""
return self._generatedColumnNames
def to_dft(self):
return DFTFromFeatureGenerator(self)
@abstractmethod
def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
"""
Fits the feature generator based on the given data
:param x: the input/features data frame for the learning problem
:param y: the corresponding output data frame for the learning problem
(which will typically contain regression or classification target columns)
:param ctx: a context object whose functionality may be required for feature generation;
this is typically the model instance that this feature generator is to generate inputs for
"""
pass
def fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
"""
Fits the feature generator based on the given data
:param x: the input/features data frame for the learning problem
:param y: the corresponding output data frame for the learning problem
(which will typically contain regression or classification target columns)
:param ctx: a context object whose functionality may be required for feature generation;
this is typically the model instance that this feature generator is to generate inputs for
"""
log.debug(f"Fitting {self}")
self._fit(x, y=y, ctx=ctx)
self._isFitted = True
def is_fitted(self):
return self._isFitted
def generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
"""
Generates features for the data points in the given data frame
:param df: the input data frame for which to generate features
:param ctx: a context object whose functionality may be required for feature generation;
this is typically the model instance that this feature generator is to generate inputs for
:return: a data frame containing the generated features, which uses the same index as X (and Y)
"""
if not self.is_fitted():
raise Exception(f"Cannot generate features from a FeatureGenerator which is not fitted: "
f"the feature generator {self.get_name()} requires fitting")
log.debug(f"Generating features with {self}")
result_df = self._generate(df, ctx=ctx)
is_column_duplicated_array = result_df.columns.duplicated()
if any(is_column_duplicated_array):
duplicated_columns = set(result_df.columns[is_column_duplicated_array])
raise DuplicateColumnNamesException(f"Feature data frame contains duplicate column names: {duplicated_columns}")
# ensure that categorical columns have dtype 'category'
categorical_feature_names = []
if self._categoricalFeatureNameRegex is not None:
result_df = result_df.copy() # result_df we got might be a view of some other DF, so before we modify it, we must copy it
categorical_feature_names = [col for col in result_df.columns if self.is_categorical_feature(col)]
for colName in categorical_feature_names:
series = result_df[colName].copy()
if series.dtype.name != 'category':
result_df[colName] = series.astype('category', copy=False)
self._generatedColumnNames = result_df.columns
# finalise normalisation rule template (if any) by making it apply to all non-categorical features
# (a default rule applies to categorical features)
if self._mustUpdateNormalisationRuleBasedOnColumnNames:
non_categorical_features = list(set(self._generatedColumnNames).difference(categorical_feature_names))
# NOTE: We here update the existing rule which was instantiated with a dummy regex because
# some mechanisms (e.g. MultiFeatureGenerators) retrieve rule instances early on (before generate
# is ever called) and therefore updating an existing rule is the safe route and should always
# work, because rules should never actually be applied before generate has indeed been called
self._normalisationRules[0].set_regex(or_regex_group(non_categorical_features))
self._mustUpdateNormalisationRuleBasedOnColumnNames = False
return result_df
@abstractmethod
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
"""
Generates features for the data points in the given data frame.
:param df: the input data frame for which to generate features
:param ctx: a context object whose functionality may be required for feature generation;
this is typically the model instance that this feature generator is to generate inputs for
:return: a data frame containing the generated features, which uses the same index as ``df``.
The data frame's columns holding categorical columns are not required to have dtype ``category``;
this will be ensured by the encapsulating call as long as the respective columns' names
were appropriately provided at construction.
"""
pass
def fit_generate(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None) -> pd.DataFrame:
"""
Fits the feature generator and subsequently generates features for the data points in the given data frame
:param x: the input data frame for the learning problem and for which to generate features
:param y: the corresponding output data frame for the learning problem
(which will typically contain regression or classification target columns)
:param ctx: a context object whose functionality may be required for feature generation;
this is typically the model instance that this feature generator is to generate inputs for
:return: a data frame containing the generated features, which uses the same index as X (and Y)
"""
self.fit(x, y, ctx)
return self.generate(x, ctx)
def flattened(self,
columns_to_flatten: List[str] = None,
normalisation_rules=(),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None,
keep_other_columns=True) -> "ChainedFeatureGenerator":
"""
Returns a new feature generator which returns flattened versions of one or more of the vector-valued columns generated
by this feature generator.
:param columns_to_flatten: the list of columns to flatten; if None, flatten all columns
:param normalisation_rules: a list of normalisation rules which apply to the flattened columns
:param normalisation_rule_template: a normalisation rule template which applies to all generated flattened columns
:param keep_other_columns: if True, any additional columns that are not to be flattened are to be retained
by the returned feature generator; if False, additional columns are to be discarded
:return: a feature generator which generates the flattened columns
"""
return flattened_feature_generator(self, columns_to_flatten=columns_to_flatten, normalisation_rules=normalisation_rules,
keep_other_columns=keep_other_columns, normalisation_rule_template=normalisation_rule_template)
def concat(self, *others: "FeatureGenerator") -> "MultiFeatureGenerator":
"""
Concatenates this feature generator with one or more other feature generator in order to produce a feature generator that
jointly generates all features
:param others: other feature generators
:return: a :class:`MultiFeatureGenerator`
"""
if isinstance(self, MultiFeatureGenerator):
fgens = list(self.featureGenerators)
else:
fgens = [self]
fgens.extend(others)
return MultiFeatureGenerator(fgens)
def chain(self, *others: "FeatureGenerator") -> "ChainedFeatureGenerator":
"""
Chains this feature generator with one or more other feature generators such that each feature generator
receives as input the output of the preceding feature generator. The resulting feature generator
produces the features of the last element in the chain.
:param others: other feature generator
:return: a :class:`ChainedFeatureGenerator`
"""
if isinstance(self, ChainedFeatureGenerator):
fgens = self.featureGenerators
else:
fgens = [self]
fgens.extend(others)
return ChainedFeatureGenerator(fgens)
class RuleBasedFeatureGenerator(FeatureGenerator, ABC):
"""
A feature generator which does not require fitting
"""
def fit(self, x, y=None, ctx=None):
pass
def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
pass
def is_fitted(self):
return True
class MultiFeatureGenerator(FeatureGenerator):
"""
Wrapper for multiple feature generators. Calling generate here applies all given feature generators independently and
returns the concatenation of their outputs
"""
def __init__(self, *feature_generators: Union[FeatureGenerator, List[FeatureGenerator]]):
self.featureGenerators = feature_generators = flatten_arguments(feature_generators)
if len(self.featureGenerators) == 0:
log.debug("Creating an empty MultiFeatureGenerator. It will generate a data frame without columns.")
categorical_feature_name_regexes = [regex for regex in [fg.get_categorical_feature_name_regex()
for fg in feature_generators] if regex is not None]
if len(categorical_feature_name_regexes) > 0:
categorical_feature_names = "|".join(categorical_feature_name_regexes)
else:
categorical_feature_names = ()
normalisation_rules = util.concat_sequences([fg.get_normalisation_rules() for fg in feature_generators])
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
add_categorical_default_rules=False)
def _tostring_object_info(self) -> str:
return f"featureGenerators={list_string(self.featureGenerators)}"
def _generate_from_multiple(self, generate_features: Callable[[FeatureGenerator], pd.DataFrame], index) -> pd.DataFrame:
dfs = []
for fg in self.featureGenerators:
df = generate_features(fg)
dfs.append(df)
if len(dfs) == 0:
return pd.DataFrame(index=index)
else:
combined_df = pd.concat(dfs, axis=1)
if len(combined_df.columns) != len(set(combined_df.columns)):
raise Exception(f"At least one column was generated more than once: {list(combined_df.columns)}; "
f"check feature generators for correctness!")
return combined_df
def _generate(self, input_df: pd.DataFrame, ctx=None):
def generate_features(fg: FeatureGenerator):
return fg.generate(input_df, ctx=ctx)
return self._generate_from_multiple(generate_features, input_df.index)
def fit_generate(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None) -> pd.DataFrame:
log.debug(f"Fitting and generating features with {self}")
def generate_features(fg: FeatureGenerator):
return fg.fit_generate(x, y, ctx)
return self._generate_from_multiple(generate_features, x.index)
def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
for fg in self.featureGenerators:
fg.fit(x, y)
def is_fitted(self):
return all([fg.is_fitted() for fg in self.featureGenerators])
def info(self):
info = super(MultiFeatureGenerator, self).info()
info["featureGeneratorNames"] = self.get_names()
return info
def get_names(self) -> list:
return functools.reduce(lambda x, y: x + y, [fg.get_names() for fg in self.featureGenerators], [])
class FeatureGeneratorFromNamedTuples(FeatureGenerator, ABC):
"""
Generates feature values for one data point at a time, creating a dictionary with
feature values from each named tuple
"""
def __init__(self, cache: util.cache.KeyValueCache = None, categorical_feature_names: Sequence[str] = (),
normalisation_rules: Sequence[data_transformation.DFTNormalisation.Rule] = (),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None):
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
normalisation_rule_template=normalisation_rule_template)
self.cache = cache
def _generate(self, df: pd.DataFrame, ctx=None):
dicts = []
for idx, nt in enumerate(df.itertuples()):
nt: PandasNamedTuple
if idx % 100 == 0:
log.debug(f"Generating feature via {self.__class__.__name__} for index {idx}")
value = None
if self.cache is not None:
value = self.cache.get(nt.Index)
if value is None:
value = self._generate_feature_dict(nt)
if self.cache is not None:
self.cache.set(nt.Index, value)
dicts.append(value)
return pd.DataFrame(dicts, index=df.index)
@abstractmethod
def _generate_feature_dict(self, named_tuple) -> Dict[str, Any]:
"""
Creates a dictionary with feature values from a named tuple
:param named_tuple: the data point for which to generate features
:return: a dictionary mapping feature names to values
"""
pass
class FeatureGeneratorTakeColumns(RuleBasedFeatureGenerator):
def __init__(self, columns: Union[str, List[str]] = None, except_columns: Sequence[str] = (),
categorical_feature_names: Optional[Union[Sequence[str], str]] = (),
normalisation_rules: Sequence[data_transformation.DFTNormalisation.Rule] = (),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None,
verify_column_names=True):
"""
:param columns: name of the column or list of names of columns to be taken. If None, all columns will be taken.
:param except_columns: list of names of columns to not take if present in the input df
:param categorical_feature_names: either a sequence of column names or a regex that is to match all categorical feature names
(which must not only work for the feature generated by this feature generator, i.e. it should not match feature names generated
by other feature generators).
It will be ensured that the respective columns in the generated data frames will have dtype 'category'.
Furthermore, presence of meta-information can later be leveraged for further transformations, e.g. one-hot encoding.
:param normalisation_rules: Rules to be used by DFTNormalisation (e.g. for constructing an input transformer for a model).
These rules are only relevant if a DFTNormalisation object consuming them is instantiated and used
within a data processing pipeline. They do not affect feature generation.
:param normalisation_rule_template: This parameter can be supplied instead of normalisationRules for the case where
there shall be a single rule that applies to all columns generated by this feature generator that were not labeled as
categorical.
:param verify_column_names: if True and columns to take were specified, will raise an error in case said columns
are missing during feature generation. If False, will log on info level instead
"""
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
normalisation_rule_template=normalisation_rule_template)
if isinstance(columns, str):
columns = [columns]
self.columns = columns
self.exceptColumns = except_columns
self.verifyColumnNames = verify_column_names
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
columns_to_take = self.columns if self.columns is not None else df.columns
columns_to_take = [col for col in columns_to_take if col not in self.exceptColumns]
if self.columns is not None:
missing_cols = set(columns_to_take).difference(df.columns)
if len(missing_cols) > 0:
missing_cols_notification = f"Columns {missing_cols} were specified but are not present in data frame. " \
f"verifyColumnNames was set to {self.verifyColumnNames}; " \
f"available columns: {list(df.columns)}"
if self.verifyColumnNames:
raise RuntimeError(missing_cols_notification)
log.info(missing_cols_notification)
return df[columns_to_take]
def info(self):
info = super().info()
info["columns"] = self.columns
info["exceptColumns"] = self.exceptColumns
return info
class FeatureGeneratorFlattenColumns(RuleBasedFeatureGenerator):
"""
Instances of this class take columns with vectors and creates a data frame with columns containing entries of
these vectors.
For example, if columns "vec1", "vec2" contain vectors of dimensions dim1, dim2, a data frame with dim1+dim2 new columns
will be created. It will contain the columns "vec1_<i1>", "vec2_<i2>" with i1, i2 ranging in (0, dim1), (0, dim2).
"""
def __init__(self, columns: Optional[Union[str, Sequence[str]]] = None, categorical_feature_names: Sequence[str] = (),
normalisation_rules: Sequence[data_transformation.DFTNormalisation.Rule] = (),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None):
"""
:param columns: name of the column or list of names of columns to be flattened. If None, all columns will be flattened.
:param categorical_feature_names:
:param normalisation_rules:
:param normalisation_rule_template:
"""
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
normalisation_rule_template=normalisation_rule_template)
if isinstance(columns, str):
columns = [columns]
self.columns = columns
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
result_df = pd.DataFrame(index=df.index)
columns_to_flatten = self.columns if self.columns is not None else df.columns
for col in columns_to_flatten:
log.debug(f"Flattening column {col}")
# NOTE: we found the use of np.stack to produce the most runtime-efficient results.
# Other variants, e.g. based on lists instead of numpy.arrays, perform much worse.
values = np.stack(df[col].values)
if len(values.shape) != 2:
raise ValueError(f"Column {col} was expected to contain one dimensional vectors, something went wrong")
dimension = values.shape[1]
new_columns = [f"{col}_{i}" for i in range(dimension)]
log.debug(f"Flattening resulted in {len(new_columns)} new columns")
result_df[new_columns] = pd.DataFrame(values, index=df.index)
return result_df
def info(self):
info = super().info()
info["columns"] = self.columns
return info
class FeatureGeneratorFromColumnGenerator(RuleBasedFeatureGenerator):
"""
Implements a feature generator via a column generator
"""
log = log.getChild(__qualname__)
def __init__(self, column_gen: 'ColumnGenerator', take_input_column_if_present=False, is_categorical=False,
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None):
"""
:param column_gen: the underlying column generator
:param take_input_column_if_present: if True, then if a column whose name corresponds to the column to generate exists
in the input data, simply copy it to generate the output (without using the column generator); if False, always
apply the columnGen to generate the output
:param is_categorical: whether the resulting column is categorical
:param normalisation_rule_template: template for a DFTNormalisation for the resulting column.
This should only be provided if is_categorical is False
"""
if is_categorical and normalisation_rule_template is not None:
raise ValueError(f"normalisationRuleTemplate should be None when the generated column is categorical")
categorical_feature_names = (column_gen.generatedColumnName,) if is_categorical else ()
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rule_template=normalisation_rule_template)
self.takeInputColumnIfPresent = take_input_column_if_present
self.columnGen = column_gen
def info(self):
info = super().info()
info["takeInputColumnIfPresent"] = self.takeInputColumnIfPresent
info["generatedColName"] = self.columnGen.generatedColumnName
return info
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
col_name = self.columnGen.generatedColumnName
if self.takeInputColumnIfPresent and col_name in df.columns:
self.log.debug(f"Taking column '{col_name}' from input data frame")
series = df[col_name]
else:
self.log.debug(f"Generating column '{col_name}' via {self.columnGen}")
series = self.columnGen.generate_column(df)
return pd.DataFrame({col_name: series})
class ChainedFeatureGenerator(FeatureGenerator):
"""
Chains feature generators such that they are executed one after another. The output of generator i>=1 is the input of
generator i+1 in the generator sequence.
"""
def __init__(self, *feature_generators: Union[FeatureGenerator, List[FeatureGenerator]]):
"""
:param feature_generators: feature generators to apply in order; the properties of the last feature generator
determine the relevant meta-data such as categorical feature names and normalisation rules
"""
self.featureGenerators = flatten_arguments(feature_generators)
if len(feature_generators) == 0:
raise ValueError("Empty list of feature generators")
last_fg: FeatureGenerator = self.featureGenerators[-1]
super().__init__(
categorical_feature_names=last_fg.get_categorical_feature_name_regex(), normalisation_rules=last_fg.get_normalisation_rules(),
add_categorical_default_rules=False)
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
for featureGen in self.featureGenerators:
df = featureGen.generate(df, ctx)
return df
def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
self.fit_generate(x, y, ctx)
def is_fitted(self):
return all([fg.is_fitted() for fg in self.featureGenerators])
def info(self):
info = super().info()
info["chainedFeatureGeneratorNames"] = self.get_names()
def fit_generate(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None) -> pd.DataFrame:
log.debug(f"Fitting and generating features with {self}")
for fg in self.featureGenerators:
x = fg.fit_generate(x, y, ctx)
return x
class FeatureGeneratorTargetDistribution(FeatureGenerator):
"""
A feature generator, which, for a column T (typically the categorical target column of a classification problem
or the continuous target column of a regression problem),
* can ensure that T takes on limited set of values t_1, ..., t_n by allowing the user to apply
binning using given bin boundaries
* computes for each value c of a categorical column C the conditional empirical distribution
P(T | C=c) in the training data during the training phase,
* generates, for each requested column C and value c in the column, n features
'<C>_<T>_distribution_<t_i>' = P(T=t_i | C=c) if flatten=True
or one feature '<C>_<T>_distribution' = [P(T=t_i | C=c), ..., P(T=t_n | C=c)] if flatten=False
Being probability values, the features generated by this feature generator are already normalised.
"""
def __init__(self,
columns: Union[str, Sequence[str]],
target_column: str,
target_column_bins: Optional[Union[Sequence[float], int, pd.IntervalIndex]],
target_column_in_features_df=False,
flatten=True):
"""
:param columns: the categorical columns for which to generate distribution features
:param target_column: the column the distributions over which will make up the features.
If targetColumnBins is not None, this column will be discretised before computing the conditional distributions
:param target_column_bins: if not None, specifies the binning to apply via pandas.cut
(see https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.cut.html).
Note that if a value should match no bin, NaN will generated. To avoid this when specifying bin boundaries in a list,
-inf and +inf should be used as the first and last entries.
:param target_column_in_features_df: if True, when fitting will look for targetColumn in the features data frame (X) instead of in
target data frame (Y)
:param flatten: whether to generate a separate scalar feature per distribution value rather than one feature
with all of the distribution's values
"""
self.flatten = flatten
if isinstance(columns, str):
columns = [columns]
self.columns = columns
self.targetColumn = target_column
self.targetColumnInFeaturesDf = target_column_in_features_df
self.targetColumnBins = target_column_bins
if self.flatten:
normalisation_rule_template = data_transformation.DFTNormalisation.RuleTemplate(skip=True)
else:
normalisation_rule_template = data_transformation.DFTNormalisation.RuleTemplate(unsupported=True)
super().__init__(normalisation_rule_template=normalisation_rule_template)
self._targetColumnValues = None
# This will hold the mapping: column -> featureValue -> targetValue -> targetValueEmpiricalProbability
self._discreteTargetDistributionsByColumn: Optional[Dict[str, Dict[Any, Dict[Any, float]]]] = None
def info(self):
info = super().info()
info["columns"] = self.columns
info["targetColumn"] = self.targetColumn
info["targetColumnBins"] = self.targetColumnBins
info["flatten"] = self.flatten
return info
def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
"""
This will persist the empirical target probability distributions for all unique values in the specified columns
"""
if self.targetColumnInFeaturesDf:
target = x[self.targetColumn]
else:
target = y[self.targetColumn]
if self.targetColumnBins is not None:
discretised_target = pd.cut(target, self.targetColumnBins)
else:
discretised_target = target
self._targetColumnValues = discretised_target.unique()
self._discreteTargetDistributionsByColumn = {}
for column in self.columns:
self._discreteTargetDistributionsByColumn[column] = {}
column_target_df = pd.DataFrame()
column_target_df[column] = x[column]
column_target_df["target"] = discretised_target.values
for value, valueTargetsDf in column_target_df.groupby(column):
# The normalized value_counts contain targetValue -> targetValueEmpiricalProbability for the current value
self._discreteTargetDistributionsByColumn[column][value] = valueTargetsDf["target"].value_counts(normalize=True).to_dict()
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
if self._discreteTargetDistributionsByColumn is None:
raise Exception("Feature generator has not been fitted")
result_df = pd.DataFrame(index=df.index)
for column in self.columns:
target_distribution_by_value = self._discreteTargetDistributionsByColumn[column]
if self.flatten:
for target_value in self._targetColumnValues:
# Important: pd.Series.apply should not be used here, as it would label the resulting column as categorical
result_df[f"{column}_{self.targetColumn}_distribution_{target_value}"] = \
[target_distribution_by_value[value].get(target_value, 0.0) for value in df[column]]
else:
distributions = [[target_distribution_by_value[value].get(targetValue, 0.0) for targetValue in self._targetColumnValues]
for value in df[column]]
result_df[f"{column}_{self.targetColumn}_distribution"] = pd.Series(distributions, index=df[column].index)
return result_df
class FeatureGeneratorFromVectorModel(FeatureGenerator):
def __init__(self,
vector_model: "VectorModel",
target_feature_generator: FeatureGenerator,
categorical_feature_names: Sequence[str] = (),
normalisation_rules: Sequence[data_transformation.DFTNormalisation.Rule] = (),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None,
input_feature_generator: FeatureGenerator = None,
use_target_feature_generator_for_training=False):
"""
Provides a feature via predictions of a given model
:param vector_model: model used for generate features from predictions
:param target_feature_generator: generator for target to be predicted
:param categorical_feature_names:
:param normalisation_rules:
:param normalisation_rule_template:
:param input_feature_generator: optional feature generator to be applied to input of vectorModel's fit and predict
:param use_target_feature_generator_for_training: if False, this generator will always apply the model
to generate features.
If True, this generator will use targetFeatureGenerator to generate features, bypassing the
model. This is useful for the case where the model which is
to receive the generated features shall be trained on the original targets rather than the predictions
thereof.
"""
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
normalisation_rule_template=normalisation_rule_template)
self.useTargetFeatureGeneratorForTraining = use_target_feature_generator_for_training
self.targetFeatureGenerator = target_feature_generator
self.inputFeatureGenerator = input_feature_generator
self.useTargetFeatureGeneratorForTraining = use_target_feature_generator_for_training
self.vectorModel = vector_model
def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
target_df = self.targetFeatureGenerator.fit_generate(x, y)
if self.inputFeatureGenerator:
x = self.inputFeatureGenerator.fit_generate(x, y)
self.vectorModel.fit(x, target_df)
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
if self.inputFeatureGenerator:
df = self.inputFeatureGenerator.generate(df)
if self.useTargetFeatureGeneratorForTraining and not ctx.is_fitted():
log.debug(f"Using targetFeatureGenerator {self.targetFeatureGenerator.__class__.__name__} to generate target features")
return self.targetFeatureGenerator.generate(df)
else:
log.debug(f"Generating target features via {self.vectorModel.__class__.__name__}")
return self.vectorModel.predict(df)
def info(self):
info = super().info()
info["wrappedModel"] = str(self.vectorModel)
return info
class FeatureGeneratorMapColumn(RuleBasedFeatureGenerator, ABC):
"""
Creates a single feature from a single input column by applying a function to each element of the input column
"""
def __init__(self,
input_col_name: str,
feature_col_name: str,
categorical_feature_names: Optional[Union[Sequence[str], str]] = None,
normalisation_rules: Sequence[data_transformation.DFTNormalisation.Rule] = (),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None,
add_categorical_default_rules=True):
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
normalisation_rule_template=normalisation_rule_template, add_categorical_default_rules=add_categorical_default_rules)
self._inputColName = input_col_name
self._featureColName = feature_col_name
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
if self._inputColName not in df.columns:
raise ValueError(f"Column '{self._inputColName}' required by feature generator not found in list of columns: "
f"{list(df.columns)}")
input_series = df[self._inputColName]
values = input_series.apply(self._create_value)
return pd.DataFrame({self._featureColName: values}, index=df.index)
@abstractmethod
def _create_value(self, value):
"""
Maps a value from the input column to a feature value
:param value: a value from the input column
:return: the feature value
"""
pass
class FeatureGeneratorMapColumnDict(RuleBasedFeatureGenerator, ABC):
"""
Creates an arbitrary number of features from a single input column by applying a function to each element of the input column
"""
def __init__(self, input_col_name: str, categorical_feature_names: Optional[Union[Sequence[str], str]] = None,
normalisation_rules: Sequence[data_transformation.DFTNormalisation.Rule] = (),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None, add_categorical_default_rules=True):
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
normalisation_rule_template=normalisation_rule_template, add_categorical_default_rules=add_categorical_default_rules)
self._inputColName = input_col_name
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
if self._inputColName not in df.columns:
raise ValueError(f"Column '{self._inputColName}' required by feature generator not found in list of columns: "
f"{list(df.columns)}")
input_series = df[self._inputColName]
values = [self._create_features_dict(v) for v in input_series]
return pd.DataFrame(values, index=df.index)
@abstractmethod
def _create_features_dict(self, value) -> Dict[str, Any]:
"""
Maps a value from the input column to a dictionary containing one or more features.
:param value: a value from the input column
:return: a dictionary mapping feature names to values
"""
pass
class FeatureGeneratorNAMarker(RuleBasedFeatureGenerator):
"""
Creates features indicating whether another feature is N/A (not available).
It can be practical to use this feature generator in conjunction with DFTFillNA for models that cannot handle missing values.
"""
def __init__(self, columns: List[str], value_a=0, value_na=1):
"""
Note: When changing the default values used, use only values that are considered to be normalised when using this
feature generation in a context where DFTNormalisation is used (no normalisation is applied to features generated
by this feature generator).
:param columns: the columns for which to generate
:param value_a: the feature value if the input feature is available
:param value_na: the feature value if the input feature is not available
"""
super().__init__(normalisation_rule_template=DFTNormalisation.RuleTemplate(skip=True))
self.columns = columns
self.valueA = value_a
self.valueNA = value_na
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
new_cols = {}
value_map = {True: self.valueNA, False: self.valueA}
for col in self.columns:
new_cols[f"{col}_na"] = [value_map[isNA] for isNA in df[col].isna()]
return pd.DataFrame(new_cols, index=df.index)
def flattened_feature_generator(fgen: FeatureGenerator, columns_to_flatten: List[str] = None, keep_other_columns=True,
normalisation_rules: Sequence[DFTNormalisation.Rule] = (),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None):
"""
Return a flattening version of the input feature generator.
:param fgen: the feature generator which generates columns that are to be flattened
:param columns_to_flatten: list of names of output columns to be flattened; if None, flatten all columns
:param keep_other_columns: whether any additional columns that are not to be flattened are to be retained
by the returned feature generator
:param normalisation_rules: additional normalisation rules for the flattened output columns
:param normalisation_rule_template: This parameter can be supplied instead of normalisation_rules for the case where
there shall be a single rule that applies to all flattened output columns
:return: FeatureGenerator instance that will generate flattened versions of the specified columns and leave
all other output columns as is.
Example:
>>> from sensai.featuregen import FeatureGeneratorTakeColumns, flattened_feature_generator
>>> import pandas as pd
>>>
>>> df = pd.DataFrame({"foo": [[1, 2], [3, 4]], "bar": ["a", "b"]})
>>> fgen = flattened_feature_generator(FeatureGeneratorTakeColumns(), columns_to_flatten=["foo"])
>>> fgen.generate(df)
foo_0 foo_1 bar
0 1 2 a
1 3 4 b
"""
flattening_generator = FeatureGeneratorFlattenColumns(columns=columns_to_flatten, normalisation_rules=normalisation_rules,
normalisation_rule_template=normalisation_rule_template)
if columns_to_flatten is None or not keep_other_columns:
return ChainedFeatureGenerator(fgen, flattening_generator)
else:
return ChainedFeatureGenerator(fgen,
MultiFeatureGenerator(flattening_generator, FeatureGeneratorTakeColumns(except_columns=columns_to_flatten)))
class FeatureGeneratorFromDFT(FeatureGenerator):
def __init__(self, dft: DataFrameTransformer, categorical_feature_names: Optional[Union[Sequence[str], str]] = None,
normalisation_rules: Sequence[data_transformation.DFTNormalisation.Rule] = (),
normalisation_rule_template: data_transformation.DFTNormalisation.RuleTemplate = None,
add_categorical_default_rules=True):
super().__init__(categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
normalisation_rule_template=normalisation_rule_template, add_categorical_default_rules=add_categorical_default_rules)
self.dft = dft
def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
self.dft.fit(x)
def _generate(self, df: pd.DataFrame, ctx=None) -> pd.DataFrame:
return self.dft.apply(df)