-
Notifications
You must be signed in to change notification settings - Fork 50
/
database.py
1189 lines (970 loc) · 43.1 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Implementation of the class Database, wrapping a pandas dataframe
for specific services to Biogeme
:author: Michel Bierlaire
:date: Tue Mar 26 16:42:54 2019
"""
from __future__ import annotations
import logging
from typing import NamedTuple, TYPE_CHECKING
import numpy as np
import pandas as pd
import biogeme.filenames as bf
import biogeme.tools.database
from biogeme.deprecated import deprecated
from biogeme.exceptions import BiogemeError
from biogeme.expressions import (
Variable,
Expression,
validate_and_convert,
)
from biogeme.native_draws import (
RandomNumberGeneratorTuple,
RandomNumberGenerator,
native_random_number_generators,
)
from biogeme.segmentation import DiscreteSegmentationTuple
if TYPE_CHECKING:
from biogeme.expressions import ExpressionOrNumeric
class EstimationValidation(NamedTuple):
estimation: pd.DataFrame
validation: pd.DataFrame
logger = logging.getLogger(__name__)
"""Logger that controls the output of
messages to the screen and log file.
"""
class Database:
"""Class that contains and prepare the database."""
def __init__(self, name: str, pandas_database: pd.DataFrame):
"""Constructor
:param name: name of the database.
:type name: string
:param pandas_database: data stored in a pandas data frame.
:type pandas_database: pandas.DataFrame
:raise BiogemeError: if the audit function detects errors.
:raise BiogemeError: if the database is empty.
"""
self.name = name
""" Name of the database. Used mainly for the file name when
dumping data.
"""
if len(pandas_database.index) == 0:
error_msg = 'Database has no entry'
raise BiogemeError(error_msg)
self.data = pandas_database #: Pandas data frame containing the data.
self.fullData = pandas_database
"""Pandas data frame containing the full data. Useful when batches of
the sample are used for approximating the log likelihood.
"""
self.variables = None
"""names of the headers of the database so that they can be used as
an object of type biogeme.expressions.Expression. Initialized
by _generateHeaders()
"""
self._generate_headers()
self.excludedData = 0
"""Number of observations removed by the function
:meth:`biogeme.Database.remove`
"""
self.panelColumn = None
"""Name of the column identifying the individuals in a panel
data context. None if data is not panel.
"""
self.individualMap = None
"""map identifying the range of observations for each individual in a
panel data context. None if data is not panel.
"""
self.fullIndividualMap = None
"""complete map identifying the range of observations for each
individual in a panel data context. None if data is not
panel. Useful when batches of the sample are used to
approximate the log likelihood function.
"""
self.userRandomNumberGenerators: dict[str, RandomNumberGeneratorTuple] = {}
"""Dictionary containing user defined random number
generators. Defined by the function
Database.setRandomNumberGenerators that checks that reserved
keywords are not used. The element of the dictionary is a
tuple with two elements: (0) the function generating the
draws, and (1) a string describing the type of draws
"""
self.number_of_draws = 0
"""Number of draws generated by the function Database.generateDraws.
Value 0 if this function is not called.
"""
self.typesOfDraws = {} #: Types of draws for Monte Carlo integration
self.theDraws = None #: Draws for Monte-Carlo integration
self._avail = None #: Availability expression to check
self._choice = None #: Choice expression to check
self._expression = None #: Expression to check
list_of_errors, _ = self._audit()
# For now, the audit issues only errors. If warnings are
# triggered in the future, the nexrt lines should be
# uncommented.
# if listOfWarnings:
# logger.warning('\n'.join(listOfWarnings))
if list_of_errors:
logger.warning('\n'.join(list_of_errors))
raise BiogemeError('\n'.join(list_of_errors))
def _audit(self) -> tuple[list[str], list[str]]:
"""Performs a series of checks and reports warnings and errors.
- Check if there are non-numerical entries.
- Check if there are NaN (not a number) entries.
- Check if there are strings.
- Check if the numbering of individuals are contiguous
(panel data only).
:return: A tuple of two lists with the results of the diagnostic:
list_of_errors, list_of_warnings
:rtype: tuple(list(str), list(str))
"""
list_of_errors = []
list_of_warnings = []
for col, dtype in self.data.dtypes.items():
if not np.issubdtype(dtype, np.number):
the_error = f'Column {col} in the database does contain {dtype}'
list_of_errors.append(the_error)
if self.data.isnull().values.any():
the_error = (
'The database contains NaN value(s). '
'Detect where they are using the function isnan()'
)
list_of_errors.append(the_error)
return list_of_errors, list_of_warnings
def _generate_headers(self) -> None:
"""Record the names of the headers
of the database so that they can be used as an object of type
biogeme.expressions.Expression
"""
self.variables = {col: Variable(col) for col in self.data.columns}
def values_from_database(self, expression: Expression) -> pd.Series:
"""Evaluates an expression for each entry of the database.
:param expression: expression to evaluate
:type expression: biogeme.expressions.Expression.
:return: numpy series, long as the number of entries
in the database, containing the calculated quantities.
:rtype: numpy.Series
:raise BiogemeError: if the database is empty.
"""
if len(self.data.index) == 0:
error_msg = 'Database has no entry'
raise BiogemeError(error_msg)
return expression.get_value_c(database=self, prepare_ids=True)
@deprecated
def valuesFromDatabase(self, expression: Expression) -> pd.Series:
pass
def check_availability_of_chosen_alt(
self, avail: dict[int, Expression], choice: Expression
) -> pd.Series:
"""Check if the chosen alternative is available for each entry
in the database.
:param avail: list of expressions to evaluate the
availability conditions for each alternative.
:type avail: list of biogeme.expressions.Expression
:param choice: expression for the chosen alternative.
:type choice: biogeme.expressions.Expression
:return: numpy series of bool, long as the number of entries
in the database, containing True is the chosen alternative is
available, False otherwise.
:rtype: numpy.Series
:raise BiogemeError: if the chosen alternative does not appear
in the availability dict
:raise BiogemeError: if the database is empty.
"""
self._avail = avail
self._choice = choice
if len(self.data.index) == 0:
error_msg = 'Database has no entry'
raise BiogemeError(error_msg)
choice_array = choice.get_value_c(
database=self, aggregation=False, prepare_ids=True
)
calculated_avail = {}
for key, expression in avail.items():
calculated_avail[key] = expression.get_value_c(
database=self, aggregation=False, prepare_ids=True
)
try:
avail_chosen = np.array(
[calculated_avail[c][i] for i, c in enumerate(choice_array)]
)
return avail_chosen != 0
except KeyError as exc:
for c in choice_array:
if c not in calculated_avail:
err_msg = (
f'Chosen alternative {c} does not appear in '
f'availability dict: {calculated_avail.keys()}'
)
raise BiogemeError(err_msg) from exc
@deprecated
def checkAvailabilityOfChosenAlt(
self, avail: dict[int, Expression], choice: Expression
) -> pd.Series:
pass
def choice_availability_statistics(
self, avail: dict[int, Expression], choice: Expression
) -> dict[int, tuple[int, int]]:
"""Calculates the number of time an alternative is chosen and available
:param avail: list of expressions to evaluate the
availability conditions for each alternative.
:type avail: list of biogeme.expressions.Expression
:param choice: expression for the chosen alternative.
:type choice: biogeme.expressions.Expression
:return: for each alternative, a tuple containing the number of time
it is chosen, and the number of time it is available.
:rtype: dict(int: (int, int))
:raise BiogemeError: if the database is empty.
"""
if len(self.data.index) == 0:
error_msg = 'Database has no entry'
raise BiogemeError(error_msg)
self._avail = avail
self._choice = choice
choice_array = choice.get_value_c(
database=self,
aggregation=False,
prepare_ids=True,
)
unique = np.unique(choice_array, return_counts=True)
choice_stat = {alt: int(unique[1][i]) for i, alt in enumerate(list(unique[0]))}
calculated_avail = {}
for key, expression in avail.items():
calculated_avail[key] = expression.get_value_c(
database=self,
aggregation=False,
prepare_ids=True,
)
avail_stat = {k: sum(a) for k, a in calculated_avail.items()}
the_results = {alt: (c, avail_stat[alt]) for alt, c in choice_stat.items()}
return the_results
@deprecated(choice_availability_statistics)
def choiceAvailabilityStatistics(
self, avail: dict[int, Expression], choice: Expression
) -> dict[tuple[int, int]]:
pass
def scale_column(self, column: str, scale: float):
"""Multiply an entire column by a scale value
:param column: name of the column
:type column: string
:param scale: value of the scale. All values of the column will
be multiplied by that scale.
:type scale: float
"""
self.data[column] *= scale
@deprecated
def scaleColumn(self, column: str, scale: float):
pass
def suggest_scaling(
self, columns: list[str] | None = None, report_all: bool = False
):
"""Suggest a scaling of the variables in the database.
For each column, :math:`\\delta` is the difference between the
largest and the smallest value, or one if the difference is
smaller than one. The level of magnitude is evaluated as a
power of 10. The suggested scale is the inverse of this value.
.. math:: s = \\frac{1}{10^{|\\log_{10} \\delta|}}
where :math:`|x|` is the integer closest to :math:`x`.
:param columns: list of columns to be considered.
If None, all of them will be considered.
:type columns: list(str)
:param report_all: if False, remove entries where the suggested
scale is 1, 0.1 or 10
:type report_all: bool
:return: A Pandas dataframe where each row contains the name
of the variable and the suggested scale s. Ideally,
the column should be multiplied by s.
:rtype: pandas.DataFrame
:raise BiogemeError: if a variable in ``columns`` is unknown.
"""
if columns is None:
columns = self.data.columns
else:
for c in columns:
if c not in self.data:
error_msg = f'Variable {c} not found.'
raise BiogemeError(error_msg)
largest_value = [
max(np.abs(self.data[col].max()), np.abs(self.data[col].min()))
for col in columns
]
res = [
[col, 1 / 10 ** np.round(np.log10(max(1.0, lv))), lv]
for col, lv in zip(columns, largest_value)
]
df = pd.DataFrame(res, columns=['Column', 'Scale', 'Largest'])
if not report_all:
# Remove entries where the suggested scale is 1, 0.1 or 10
remove = (df.Scale == 1) | (df.Scale == 0.1) | (df.Scale == 10)
df.drop(df[remove].index, inplace=True)
return df
@deprecated
def suggestScaling(
self, columns: list[str] | None = None, report_all: bool = False
):
pass
def sample_with_replacement(self, size: int | None = None) -> pd.DataFrame:
"""Extract a random sample from the database, with replacement.
Useful for bootstrapping.
:param size: size of the sample. If None, a sample of
the same size as the database will be generated.
Default: None.
:type size: int
:return: pandas dataframe with the sample.
:rtype: pandas.DataFrame
"""
if size is None:
size = len(self.data)
sample = self.data.iloc[np.random.randint(0, len(self.data), size=size)]
return sample
@deprecated
def sampleWithReplacement(self, size: int | None = None) -> pd.DataFrame:
pass
def sample_individual_map_with_replacement(
self, size: int | None = None
) -> pd.DataFrame:
"""Extract a random sample of the individual map
from a panel data database, with replacement.
Useful for bootstrapping.
:param size: size of the sample. If None, a sample of
the same size as the database will be generated.
Default: None.
:type size: int
:return: pandas dataframe with the sample.
:rtype: pandas.DataFrame
:raise BiogemeError: if the database in not in panel mode.
"""
if not self.is_panel():
error_msg = (
'Function sampleIndividualMapWithReplacement'
' is available only on panel data.'
)
raise BiogemeError(error_msg)
if size is None:
size = len(self.individualMap)
sample = self.individualMap.iloc[
np.random.randint(0, len(self.individualMap), size=size)
]
return sample
@deprecated
def sampleIndividualMapWithReplacement(
self, size: int | None = None
) -> pd.DataFrame:
pass
#####
# This has to be reimplemented in a cleaner way
####
# def sampleWithoutReplacement(
# self, samplingRate, columnWithSamplingWeights=None
# ):
# """Replace the data set by a sample for stochastic algorithms
#
# :param samplingRate: the proportion of data to include in the sample.
# :type samplingRate: float
# :param columnWithSamplingWeights: name of the column with
# the sampling weights. If None, each row has equal probability.
# :type columnWithSamplingWeights: string
#
# :raise BiogemeError: if the structure of the database has been modified
# since last sample.
# """
# if self.isPanel():
# if self.fullIndividualMap is None:
# self.fullIndividualMap = self.individualMap
# # Check if the structure has not been modified since
# # last sample
# if set(self.fullIndividualMap.columns) != set(
# self.individualMap.columns
# ):
# message = (
# 'The structure of the database has been '
# 'modified since last sample. '
# )
# left = set(self.fullIndividualMap.columns).difference(
# set(self.individualMap.columns)
# )
# if left:
# message += f' Columns that disappeared: {left}'
# right = set(self.individualMap.columns).difference(
# set(self.fullIndividualMap.columns)
# )
# if right:
# message += f' Columns that were added: {right}'
# raise exceptions.BiogemeError(message)
#
# self.individualMap = self.fullIndividualMap.sample(
# frac=samplingRate, weights=columnWithSamplingWeights
# )
# else:
# # Cross sectional data
# if self.fullData is None:
# self.fullData = self.data
# else:
# # Check if the structure has not been modified since
# # last sample
# if set(self.fullData.columns) != set(self.data.columns):
# message = (
# 'The structure of the database has been modified '
# 'since last sample. '
# )
# left = set(self.fullData.columns).difference(
# set(self.data.columns)
# )
# if left:
# message += f' Columns that disappeared: {left}'
# right = set(self.data.columns).difference(
# set(self.fullData.columns)
# )
# if right:
# message += f' Columns that were added: {right}'
# raise exceptions.BiogemeError(message)
#
# self.data = self.fullData.sample(
# frac=samplingRate, weights=columnWithSamplingWeights
# )
# def useFullSample(self):
# """Re-establish the full sample for calculation of the likelihood"""
# if self.isPanel():
# if self.fullIndividualMap is None:
# raise exceptions.BiogemeError(
# 'Full panel data set has not been saved.'
# )
# self.individualMap = self.fullIndividualMap
# else:
# if self.fullData is None:
# raise exceptions.BiogemeError('Full data set has not been saved.')
# self.data = self.fullData
def add_column(self, expression: Expression, column: str) -> pd.Series:
"""Add a new column in the database, calculated from an expression.
:param expression: expression to evaluate
:type expression: biogeme.expressions.Expression
:param column: name of the column to add
:type column: string
:return: the added column
:rtype: numpy.Series
:raises ValueError: if the column name already exists.
:raise BiogemeError: if the database is empty.
"""
if len(self.data.index) == 0:
error_msg = 'Database has no entry'
raise BiogemeError(error_msg)
if column in self.data.columns:
raise ValueError(
f'Column {column} already exists in the database {self.name}'
)
self._expression = expression
new_column = self._expression.get_value_c(
database=self, aggregation=False, prepare_ids=True
)
self.data[column] = new_column
self.variables[column] = Variable(column)
return self.data[column]
@deprecated
def addColumn(self, expression: Expression, column: str) -> pd.Series:
pass
def define_variable(self, name: str, expression: Expression) -> Variable:
"""Insert a new column in the database and define it as a variable."""
self.add_column(expression, name)
return Variable(name)
@deprecated
def DefineVariable(self, name: str, expression: Expression) -> Variable:
pass
def remove(self, expression: ExpressionOrNumeric):
"""Removes from the database all entries such that the value
of the expression is not 0.
:param expression: expression to evaluate
:type expression: biogeme.expressions.Expression
"""
column_name = '__bioRemove__'
expression = validate_and_convert(expression)
self.add_column(expression, column_name)
self.excludedData = len(self.data[self.data[column_name] != 0].index)
self.data.drop(self.data[self.data[column_name] != 0].index, inplace=True)
self.data.drop(columns=[column_name], inplace=True)
def check_segmentation(
self, segmentation_tuple: DiscreteSegmentationTuple
) -> dict[str, int]:
"""Check that the segmentation covers the complete database
:param segmentation_tuple: object describing the segmentation
:type segmentation_tuple: biogeme.segmentation.DiscreteSegmentationTuple
:return: number of observations per segment.
:rtype: dict(str: int)
"""
all_values = self.data[segmentation_tuple.variable.name].value_counts()
# Check if all values in the segmentation are in the database
for value, name in segmentation_tuple.mapping.items():
if value not in all_values:
error_msg = (
f'Variable {segmentation_tuple.variable.name} does not '
f'take the value {value} representing segment "{name}"'
)
raise BiogemeError(error_msg)
for value, count in all_values.items():
if value not in segmentation_tuple.mapping:
error_msg = (
f'Variable {segmentation_tuple.variable.name} '
f'takes the value {value} [{count} times], and it does not '
f'define any segment.'
)
raise BiogemeError(error_msg)
named_values = {}
for value, name in segmentation_tuple.mapping.items():
named_values[name] = all_values[value]
return named_values
def dump_on_file(self) -> str:
"""Dumps the database in a CSV formatted file.
:return: name of the file
:rtype: string
"""
the_name = f'{self.name}_dumped'
data_file_name = bf.get_new_file_name(the_name, 'dat')
self.data.to_csv(data_file_name, sep='\t', index_label='__rowId')
logger.info(f'File {data_file_name} has been created')
return data_file_name
@deprecated
def dumpOnFile(self) -> str:
pass
def set_random_number_generators(self, rng: dict[str, RandomNumberGeneratorTuple]):
"""Defines user-defined random numbers generators.
:param rng: a dictionary of generators. The keys of the dictionary
characterize the name of the generators, and must be
different from the pre-defined generators in Biogeme
(see :func:`~biogeme.database.Database.generateDraws` for the list).
The elements of the
dictionary are tuples, where the first element is a function that takes two arguments: the
number of series to generate (typically, the size of the
database), and the number of draws per series, and returns the array of numbers.
The second element is a description.
:type rng: dict
Example::
def logNormalDraws(sample_size, number_of_draws):
return np.exp(np.random.randn(sample_size, number_of_draws))
def exponentialDraws(sample_size, number_of_draws):
return -1.0 * np.log(np.random.rand(sample_size, number_of_draws))
# We associate these functions with a name
dict = {'LOGNORMAL':(logNormalDraws,
'Draws from lognormal distribution'),
'EXP':(exponentialDraws,
'Draws from exponential distributions')}
my_data.setRandomNumberGenerators(dict)
:raise ValueError: if a reserved keyword is used for a
user-defined draws.
"""
for k in native_random_number_generators:
if k in rng:
error_msg = (
f'{k} is a reserved keyword for draws'
f' and cannot be used for user-defined '
f'generators'
)
raise ValueError(error_msg)
self.userRandomNumberGenerators = rng
@deprecated
def setRandomNumberGenerators(
self, rng: dict[str, tuple[RandomNumberGenerator, str]]
):
pass
def generate_draws(
self,
draw_types: dict[str, str],
names: list[str],
number_of_draws: int,
) -> np.ndarray:
"""Generate draws for each variable.
:param draw_types: A dict indexed by the names of the variables,
describing the draws. Each of them can
be a native type or any type defined by the
function
:func:`~biogeme.database.Database.setRandomNumberGenerators`.
Native types:
- ``'UNIFORM'``: Uniform U[0, 1],
- ``'UNIFORM_ANTI``: Antithetic uniform U[0, 1]',
- ``'UNIFORM_HALTON2'``: Halton draws with base 2,
skipping the first 10,
- ``'UNIFORM_HALTON3'``: Halton draws with base 3,
skipping the first 10,
- ``'UNIFORM_HALTON5'``: Halton draws with base 5,
skipping the first 10,
- ``'UNIFORM_MLHS'``: Modified Latin Hypercube
Sampling on [0, 1],
- ``'UNIFORM_MLHS_ANTI'``: Antithetic Modified
Latin Hypercube Sampling on [0, 1],
- ``'UNIFORMSYM'``: Uniform U[-1, 1],
- ``'UNIFORMSYM_ANTI'``: Antithetic uniform U[-1, 1],
- ``'UNIFORMSYM_HALTON2'``: Halton draws on [-1, 1]
with base 2, skipping the first 10,
- ``'UNIFORMSYM_HALTON3'``: Halton draws on [-1, 1]
with base 3, skipping the first 10,
- ``'UNIFORMSYM_HALTON5'``: Halton draws on [-1, 1]
with base 5, skipping the first 10,
- ``'UNIFORMSYM_MLHS'``: Modified Latin Hypercube
Sampling on [-1, 1],
- ``'UNIFORMSYM_MLHS_ANTI'``: Antithetic Modified
Latin Hypercube Sampling on [-1, 1],
- ``'NORMAL'``: Normal N(0, 1) draws,
- ``'NORMAL_ANTI'``: Antithetic normal draws,
- ``'NORMAL_HALTON2'``: Normal draws from Halton
base 2 sequence,
- ``'NORMAL_HALTON3'``: Normal draws from Halton
base 3 sequence,
- ``'NORMAL_HALTON5'``: Normal draws from Halton
base 5 sequence,
- ``'NORMAL_MLHS'``: Normal draws from Modified
Latin Hypercube Sampling,
- ``'NORMAL_MLHS_ANTI'``: Antithetic normal draws
from Modified Latin Hypercube Sampling]
For an updated description of the native types, call the function
:func:`~biogeme.database.Database.descriptionOfNativeDraws`.
:type draw_types: dict
:param names: the list of names of the variables that require draws
to be generated.
:type names: list of strings
:param number_of_draws: number of draws to generate.
:type number_of_draws: int
:return: a 3-dimensional table with draws. The 3 dimensions are
1. number of individuals
2. number of draws
3. number of variables
:rtype: numpy.array
Example::
types = {'randomDraws1': 'NORMAL_MLHS_ANTI',
'randomDraws2': 'UNIFORM_MLHS_ANTI',
'randomDraws3': 'UNIFORMSYM_MLHS_ANTI'}
theDrawsTable = my_data.generateDraws(types,
['randomDraws1', 'randomDraws2', 'randomDraws3'], 10)
:raise BiogemeError: if a type of draw is unknown.
:raise BiogemeError: if the output of the draw generator does not
have the requested dimensions.
"""
self.number_of_draws: int = number_of_draws
# Dimensions of the draw table:
# 1. number of variables
# 2. number of individuals
# 3. number of draws
list_of_draws = [None] * len(names)
for i, v in enumerate(names):
name = v
draw_type: str = draw_types[name]
self.typesOfDraws[name] = draw_type
the_generator: RandomNumberGeneratorTuple | None = (
native_random_number_generators.get(draw_type)
)
if the_generator is None:
the_generator: RandomNumberGeneratorTuple | None = (
self.userRandomNumberGenerators.get(draw_type)
)
if the_generator is None:
user = self.userRandomNumberGenerators
error_msg = (
f'Unknown type of draws for '
f'variable {name}: {draw_type}. '
f'Native types: {native_random_number_generators}. '
f'User defined: {user}'
)
raise BiogemeError(error_msg)
list_of_draws[i] = the_generator.generator(
self.get_sample_size(), number_of_draws
)
if list_of_draws[i].shape != (self.get_sample_size(), number_of_draws):
error_msg = (
f'The draw generator for {name} must'
f' generate a numpy array of dimensions'
f' ({self.get_sample_size()}, {number_of_draws})'
f' instead of {list_of_draws[i].shape}'
)
raise BiogemeError(error_msg)
self.theDraws = np.array(list_of_draws)
# Draws as a three-dimensional numpy series. The dimensions
# are organized to be more suited for calculation.
# 1. number of individuals
# 2. number of draws
# 3. number of variables
self.theDraws = np.moveaxis(self.theDraws, 0, -1)
return self.theDraws
@deprecated
def generateDraws(
self,
types: dict[str, RandomNumberGeneratorTuple],
names: list[str],
number_of_draws: int,
) -> np.ndarray:
pass
def get_number_of_observations(self) -> int:
"""
Reports the number of observations in the database.
Note that it returns the same value, irrespectively
if the database contains panel data or not.
:return: Number of observations.
:rtype: int
See also: getSampleSize()
"""
return self.data.shape[0]
@deprecated
def getNumberOfObservations(self) -> int:
pass
def get_sample_size(self) -> int:
"""Reports the size of the sample.
If the data is cross-sectional, it is the number of
observations in the database. If the data is panel, it is the
number of individuals.
:return: Sample size.
:rtype: int
See also: getNumberOfObservations()
"""
if self.is_panel():
return self.individualMap.shape[0]
return self.data.shape[0]
@deprecated
def getSampleSize(self) -> int:
pass
def split(
self, slices: int, groups: str | None = None
) -> list[EstimationValidation]:
"""Prepare estimation and validation sets for validation.
:param slices: number of slices
:type slices: int
:param groups: name of the column that defines the ID of the
groups. Data belonging to the same groups will be maintained
together.
:type groups: str
:return: list of estimation and validation data sets
:rtype: list(tuple(pandas.DataFrame, pandas.DataFrame))
:raise BiogemeError: if the number of slices is less than two
"""
if slices < 2:
error_msg = (
f'The number of slices is {slices}. It must be greater '
f'or equal to 2.'
)
raise BiogemeError(error_msg)
if groups is not None and self.is_panel():
if groups != self.panelColumn:
error_msg = (
f'The data is already organized by groups on '
f'{self.panelColumn}. The grouping by {groups} '
f'cannot be done.'
)
raise BiogemeError(error_msg)
if self.is_panel():
groups = self.panelColumn
if groups is None:
shuffled = self.data.sample(frac=1)
the_slices = np.array_split(shuffled, slices)
else:
ids = self.data[groups].unique()
np.random.shuffle(ids)
the_slices_ids = np.array_split(ids, slices)
the_slices = [
self.data[self.data[groups].isin(ids)] for ids in the_slices_ids
]
estimation_sets = []
validation_sets = []
for i, v in enumerate(the_slices):
estimation_sets.append(pd.concat(the_slices[:i] + the_slices[i + 1 :]))
validation_sets.append(v)
return [
EstimationValidation(estimation=e, validation=v)
for e, v in zip(estimation_sets, validation_sets)
]
def is_panel(self) -> bool:
"""Tells if the data is panel or not.
:return: True if the data is panel.
:rtype: bool
"""
return self.panelColumn is not None
@deprecated
def isPanel(self) -> bool:
pass
def panel(self, column_name: str):
"""Defines the data as panel data
:param column_name: name of the columns that identifies individuals.
:type column_name: string
:raise BiogemeError: if the data are not sorted properly, that
is if the data for the one individuals are not consecutive.
"""
self.panelColumn = column_name
# Check if the data is organized in consecutive entries
# Number of groups of data
n_groups = biogeme.tools.count_number_of_groups(self.data, self.panelColumn)
sorted_data = self.data.sort_values(by=[self.panelColumn])
n_individuals = biogeme.tools.count_number_of_groups(
sorted_data, self.panelColumn
)
if n_groups != n_individuals:
the_error = (
f'The data must be sorted so that the data'
f' for the same individual are consecutive.'
f' There are {n_individuals} individuals '
f'in the sample, and {n_groups} groups of '
f'data for column {self.panelColumn}.'
)
raise BiogemeError(the_error)
self.build_panel_map()
def build_panel_map(self) -> None:
"""Sorts the data so that the observations for each individuals are
contiguous, and builds a map that identifies the range of indices of
the observations of each individuals.
"""
if self.panelColumn is not None:
self.data = self.data.sort_values(by=self.panelColumn)
# It is necessary to renumber the row to reflect the new ordering
self.data.index = range(len(self.data.index))
local_map = {}
individuals = self.data[self.panelColumn].unique()
for i in individuals:
indices = self.data.loc[self.data[self.panelColumn] == i].index
local_map[i] = [min(indices), max(indices)]
self.individualMap = pd.DataFrame(local_map).T
self.fullIndividualMap = self.individualMap
@deprecated
def buildPanelMap(self) -> None:
pass
def count(self, column_name: str, value: float) -> int: