diff --git a/sdmetrics/single_column/statistical/kscomplement.py b/sdmetrics/single_column/statistical/kscomplement.py index 3be01330..525e85c7 100644 --- a/sdmetrics/single_column/statistical/kscomplement.py +++ b/sdmetrics/single_column/statistical/kscomplement.py @@ -1,5 +1,6 @@ """Kolmogorov-Smirnov test based Metric.""" +import numpy as np import pandas as pd from scipy.stats import ks_2samp @@ -56,7 +57,13 @@ def compute(real_data, synthetic_data): real_data = pd.to_numeric(real_data) synthetic_data = pd.to_numeric(synthetic_data) - statistic, _ = ks_2samp(real_data, synthetic_data) + try: + statistic, _ = ks_2samp(real_data, synthetic_data) + except ValueError as e: + if str(e) == 'Data passed to ks_2samp must not be empty': + return np.nan + else: + raise ValueError(e) return 1 - statistic diff --git a/sdmetrics/timeseries/__init__.py b/sdmetrics/timeseries/__init__.py index 6a09b529..584bcbf9 100644 --- a/sdmetrics/timeseries/__init__.py +++ b/sdmetrics/timeseries/__init__.py @@ -5,6 +5,7 @@ from sdmetrics.timeseries.detection import LSTMDetection, TimeSeriesDetectionMetric from sdmetrics.timeseries.efficacy import TimeSeriesEfficacyMetric from sdmetrics.timeseries.efficacy.classification import LSTMClassifierEfficacy +from sdmetrics.timeseries.inter_row_msas import InterRowMSAS __all__ = [ 'base', @@ -16,4 +17,5 @@ 'LSTMDetection', 'TimeSeriesEfficacyMetric', 'LSTMClassifierEfficacy', + 'InterRowMSAS', ] diff --git a/sdmetrics/timeseries/inter_row_msas.py b/sdmetrics/timeseries/inter_row_msas.py new file mode 100644 index 00000000..eea77f06 --- /dev/null +++ b/sdmetrics/timeseries/inter_row_msas.py @@ -0,0 +1,106 @@ +"""InterRowMSAS module.""" + +import warnings + +import numpy as np +import pandas as pd + +from sdmetrics.goal import Goal +from sdmetrics.single_column.statistical.kscomplement import KSComplement + + +class InterRowMSAS: + """Inter-Row Multi-Sequence Aggregate Similarity (MSAS) metric. + + Attributes: + name (str): + Name to use when reports about this metric are printed. + goal (sdmetrics.goal.Goal): + The goal of this metric. + min_value (Union[float, tuple[float]]): + Minimum value or values that this metric can take. + max_value (Union[float, tuple[float]]): + Maximum value or values that this metric can take. + """ + + name = 'Inter-Row Multi-Sequence Aggregate Similarity' + goal = Goal.MAXIMIZE + min_value = 0.0 + max_value = 1.0 + + @staticmethod + def compute(real_data, synthetic_data, n_rows_diff=1, apply_log=False): + """Compute this metric. + + This metric compares the inter-row differences of sequences in the real data + vs. the synthetic data. + + It works as follows: + - Calculate the difference between row r and row r+x for each row in the real data + - Take the average over each sequence to form a distribution D_r + - Do the same for the synthetic data to form a new distribution D_s + - Apply the KSComplement metric to compare the similarities of (D_r, D_s) + - Return this score + + Args: + real_data (tuple[pd.Series, pd.Series]): + A tuple of 2 pandas.Series objects. The first represents the sequence key + of the real data and the second represents a continuous column of data. + synthetic_data (tuple[pd.Series, pd.Series]): + A tuple of 2 pandas.Series objects. The first represents the sequence key + of the synthetic data and the second represents a continuous column of data. + n_rows_diff (int): + An integer representing the number of rows to consider when taking the difference. + apply_log (bool): + Whether to apply a natural log before taking the difference. + + Returns: + float: + The similarity score between the real and synthetic data distributions. + """ + for data in [real_data, synthetic_data]: + if ( + not isinstance(data, tuple) + or len(data) != 2 + or (not (isinstance(data[0], pd.Series) and isinstance(data[1], pd.Series))) + ): + raise ValueError('The data must be a tuple of two pandas series.') + + if not isinstance(n_rows_diff, int) or n_rows_diff < 1: + raise ValueError("'n_rows_diff' must be an integer greater than zero.") + + if not isinstance(apply_log, bool): + raise ValueError("'apply_log' must be a boolean.") + + real_keys, real_values = real_data + synthetic_keys, synthetic_values = synthetic_data + + if apply_log: + real_values = np.log(real_values) + synthetic_values = np.log(synthetic_values) + + def calculate_differences(keys, values, n_rows_diff, data_name): + group_sizes = values.groupby(keys).size() + num_invalid_groups = group_sizes[group_sizes <= n_rows_diff].count() + if num_invalid_groups > 0: + warnings.warn( + f"n_rows_diff '{n_rows_diff}' is greater than the " + f'size of {num_invalid_groups} sequence keys in {data_name}.' + ) + + differences = values.groupby(keys).apply( + lambda group: np.mean( + group.to_numpy()[n_rows_diff:] - group.to_numpy()[:-n_rows_diff] + ) + if len(group) > n_rows_diff + else np.nan + ) + + return pd.Series(differences) + + real_diff = calculate_differences(real_keys, real_values, n_rows_diff, 'real_data') + synthetic_diff = calculate_differences( + synthetic_keys, synthetic_values, n_rows_diff, 'synthetic_data' + ) + + return KSComplement.compute(real_diff, synthetic_diff) diff --git a/tests/unit/timeseries/test_inter_row_msas.py b/tests/unit/timeseries/test_inter_row_msas.py new file mode 100644 index 00000000..14101079 --- /dev/null +++ b/tests/unit/timeseries/test_inter_row_msas.py @@ -0,0 +1,176 @@ +import pandas as pd +import pytest + +from sdmetrics.timeseries.inter_row_msas import InterRowMSAS + + +class TestInterRowMSAS: + def test_compute(self): + """Test it runs.""" + # Setup + real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) + real_values = pd.Series([1, 2, 3, 4, 5, 6]) + synthetic_keys = pd.Series(['id3', 'id3', 'id3', 'id4', 'id4', 'id4']) + synthetic_values = pd.Series([1, 10, 3, 7, 5, 1]) + + # Run + score = InterRowMSAS.compute( + real_data=(real_keys, real_values), synthetic_data=(synthetic_keys, synthetic_values) + ) + + # Assert + assert score == 0.5 + + def test_compute_identical_sequences(self): + """Test it returns 1 when real and synthetic data are identical.""" + # Setup + real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) + real_values = pd.Series([1, 2, 3, 4, 5, 6]) + synthetic_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) + synthetic_values = pd.Series([1, 2, 3, 4, 5, 6]) + + # Run + score = InterRowMSAS.compute( + real_data=(real_keys, real_values), synthetic_data=(synthetic_keys, synthetic_values) + ) + + # Assert + assert score == 1 + + def test_compute_different_sequences(self): + """Test it for distinct distributions.""" + # Setup + real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) + real_values = pd.Series([1, 2, 3, 4, 5, 6]) + synthetic_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) + synthetic_values = pd.Series([1, 3, 5, 2, 4, 6]) + + # Run + score = InterRowMSAS.compute( + real_data=(real_keys, real_values), synthetic_data=(synthetic_keys, synthetic_values) + ) + + # Assert + assert score == 0 + + def test_compute_with_log(self): + """Test it with logarithmic transformation.""" + # Setup + real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) + real_values = pd.Series([1, 2, 4, 8, 16, 32]) + synthetic_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) + synthetic_values = pd.Series([1, 2, 4, 8, 16, 32]) + + # Run + score = InterRowMSAS.compute( + real_data=(real_keys, real_values), + synthetic_data=(synthetic_keys, synthetic_values), + apply_log=True, + ) + + # Assert + assert score == 1 + + def test_compute_different_n_rows_diff(self): + """Test it with different n_rows_diff.""" + # Setup + real_keys = pd.Series(['id1'] * 10 + ['id2'] * 10) + real_values = pd.Series(list(range(10)) + list(range(10))) + synthetic_keys = pd.Series(['id1'] * 10 + ['id2'] * 10) + synthetic_values = pd.Series(list(range(10)) + list(range(10))) + + # Run + score = InterRowMSAS.compute( + real_data=(real_keys, real_values), + synthetic_data=(synthetic_keys, synthetic_values), + n_rows_diff=3, + ) + + # Assert + assert score == 1 + + def test_compute_invalid_real_data(self): + """Test that it raises ValueError when real_data is invalid.""" + # Setup + real_data = [[1, 2, 3], [4, 5, 6]] # Not a tuple of pandas Series + synthetic_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) + synthetic_values = pd.Series([1, 2, 3, 4]) + + # Run and Assert + with pytest.raises(ValueError, match='The data must be a tuple of two pandas series.'): + InterRowMSAS.compute( + real_data=real_data, + synthetic_data=(synthetic_keys, synthetic_values), + n_rows_diff=1, + apply_log=False, + ) + + def test_compute_invalid_synthetic_data(self): + """Test that it raises ValueError when synthetic_data is invalid.""" + # Setup + real_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) + real_values = pd.Series([1, 2, 3, 4]) + synthetic_data = [[1, 2, 3], [4, 5, 6]] # Not a tuple of pandas Series + + # Run and Assert + with pytest.raises(ValueError, match='The data must be a tuple of two pandas series.'): + InterRowMSAS.compute( + real_data=(real_keys, real_values), + synthetic_data=synthetic_data, + n_rows_diff=1, + apply_log=False, + ) + + def test_compute_invalid_n_rows_diff(self): + """Test that it raises ValueError when n_rows_diff is invalid.""" + # Setup + real_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) + real_values = pd.Series([1, 2, 3, 4]) + synthetic_keys = pd.Series(['id3', 'id3', 'id4', 'id4']) + synthetic_values = pd.Series([1, 2, 3, 4]) + + # Run and Assert + with pytest.raises(ValueError, match="'n_rows_diff' must be an integer greater than zero."): + InterRowMSAS.compute( + real_data=(real_keys, real_values), + synthetic_data=(synthetic_keys, synthetic_values), + n_rows_diff=0, + apply_log=False, + ) + + def test_compute_invalid_apply_log(self): + """Test that it raises ValueError when apply_log is invalid.""" + # Setup + real_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) + real_values = pd.Series([1, 2, 3, 4]) + synthetic_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) + synthetic_values = pd.Series([1, 2, 3, 4]) + + # Run and Assert + with pytest.raises(ValueError, match="'apply_log' must be a boolean."): + InterRowMSAS.compute( + real_data=(real_keys, real_values), + synthetic_data=(synthetic_keys, synthetic_values), + n_rows_diff=1, + apply_log='True', # Should be a boolean, not a string + ) + + def test_compute_warning(self): + """Test a warning is raised when n_rows_diff is greater than sequence values size.""" + # Setup + real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) + real_values = pd.Series([1, 2, 3, 4, 5, 6]) + synthetic_keys = pd.Series(['id3', 'id3', 'id3', 'id4', 'id4', 'id4']) + synthetic_values = pd.Series([1, 10, 3, 7, 5, 1]) + + # Run and Assert + warn_msg = "n_rows_diff '10' is greater than the size of 2 sequence keys in real_data." + with pytest.warns(UserWarning, match=warn_msg): + score = InterRowMSAS.compute( + real_data=(real_keys, real_values), + synthetic_data=(synthetic_keys, synthetic_values), + n_rows_diff=10, + ) + + # Assert + assert pd.isna(score)