From 4530b8c46443ec1a83eb25f8b45afefa57329693 Mon Sep 17 00:00:00 2001 From: Kevin Sheppard Date: Fri, 4 Sep 2020 10:33:40 +0100 Subject: [PATCH] BUG: Ensure bestlag is defined in autolag Ensure that it is always defined even when all t-stats are small closes #7014 --- statsmodels/tsa/stattools.py | 39 +- statsmodels/tsa/tests/test_stattools.py | 612 +++++++++++++++--------- 2 files changed, 418 insertions(+), 233 deletions(-) diff --git a/statsmodels/tsa/stattools.py b/statsmodels/tsa/stattools.py index 6ad51363ab3..3adca469733 100644 --- a/statsmodels/tsa/stattools.py +++ b/statsmodels/tsa/stattools.py @@ -130,14 +130,17 @@ def _autolag( elif method == "t-stat": # stop = stats.norm.ppf(.95) stop = 1.6448536269514722 + # Default values to ensure that always set + bestlag = startlag + maxlag + icbest = 0.0 for lag in range(startlag + maxlag, startlag - 1, -1): icbest = np.abs(results[lag].tvalues[-1]) + bestlag = lag if np.abs(icbest) >= stop: - bestlag = lag - icbest = icbest + # Break for first lag with a significant t-stat break else: - raise ValueError("Information Criterion %s not understood.") % method + raise ValueError(f"Information Criterion {method} not understood.") if not regresults: return icbest, bestlag @@ -976,18 +979,21 @@ def pacf(x, nlags=None, method="ywadjusted", alpha=None): consistently worse than the other options. """ nlags = int_like(nlags, "nlags", optional=True) - renames = {"ydu":"yda", - "ywu": "ywa", - "ywunbiased":"ywadjusted", - "ldunbiased":"ldadjusted", - "ld_unbiased":"ld_adjusted", - "ldu":"lda", - "ols-unbiased":"ols-adjusted"} + renames = { + "ydu": "yda", + "ywu": "ywa", + "ywunbiased": "ywadjusted", + "ldunbiased": "ldadjusted", + "ld_unbiased": "ld_adjusted", + "ldu": "lda", + "ols-unbiased": "ols-adjusted", + } if method in renames: warnings.warn( f"{method} has been renamed {renames[method]}. After release 0.13, " "using the old name will raise.", - FutureWarning) + FutureWarning, + ) method = renames[method] methods = ( "ols", @@ -1347,7 +1353,8 @@ def grangercausalitytests(x, maxlag, addconst=True, verbose=True): if lags.min() <= 0 or lags.size == 0: raise ValueError( "maxlag must be a non-empty list containing only " - "positive integers") + "positive integers" + ) if x.shape[0] <= 3 * maxlag + int(addconst): raise ValueError( @@ -1871,9 +1878,13 @@ def kpss(x, regression="c", nlags=None, store=False): look-up table. The actual p-value is {direction} than the p-value returned. """ if p_value == pvals[-1]: - warnings.warn(warn_msg.format(direction="smaller"), InterpolationWarning) + warnings.warn( + warn_msg.format(direction="smaller"), InterpolationWarning + ) elif p_value == pvals[0]: - warnings.warn(warn_msg.format(direction="greater"), InterpolationWarning) + warnings.warn( + warn_msg.format(direction="greater"), InterpolationWarning + ) crit_dict = {"10%": crit[0], "5%": crit[1], "2.5%": crit[2], "1%": crit[3]} diff --git a/statsmodels/tsa/tests/test_stattools.py b/statsmodels/tsa/tests/test_stattools.py index b30bf6498b2..bca0460109f 100644 --- a/statsmodels/tsa/tests/test_stattools.py +++ b/statsmodels/tsa/tests/test_stattools.py @@ -4,8 +4,13 @@ import numpy as np import pandas as pd import pytest -from numpy.testing import (assert_almost_equal, assert_equal, assert_raises, - assert_, assert_allclose) +from numpy.testing import ( + assert_almost_equal, + assert_equal, + assert_raises, + assert_, + assert_allclose, +) from pandas import Series, date_range, DataFrame from statsmodels.compat.numpy import lstsq @@ -13,18 +18,31 @@ from statsmodels.compat.platform import PLATFORM_WIN from statsmodels.compat.python import lrange from statsmodels.datasets import macrodata, sunspots, nile, randhie, modechoice -from statsmodels.tools.sm_exceptions import (CollinearityWarning, - MissingDataError, - InterpolationWarning) +from statsmodels.tools.sm_exceptions import ( + CollinearityWarning, + MissingDataError, + InterpolationWarning, +) from statsmodels.tsa.arima_process import arma_acovf from statsmodels.tsa.statespace.sarimax import SARIMAX -from statsmodels.tsa.stattools import (adfuller, acf, pacf_yw, pacf_ols, - pacf, grangercausalitytests, - coint, acovf, kpss, - arma_order_select_ic, levinson_durbin, - levinson_durbin_pacf, pacf_burg, - innovations_algo, innovations_filter, - zivot_andrews) +from statsmodels.tsa.stattools import ( + adfuller, + acf, + pacf_yw, + pacf_ols, + pacf, + grangercausalitytests, + coint, + acovf, + kpss, + arma_order_select_ic, + levinson_durbin, + levinson_durbin_pacf, + pacf_burg, + innovations_algo, + innovations_filter, + zivot_andrews, +) DECIMAL_8 = 8 DECIMAL_6 = 6 @@ -37,7 +55,7 @@ CURR_DIR = os.path.dirname(os.path.abspath(__file__)) -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def acovf_data(): rnd = np.random.RandomState(12345) return rnd.randn(250) @@ -49,10 +67,11 @@ class CheckADF(object): Test values taken from Stata. """ - levels = ['1%', '5%', '10%'] + + levels = ["1%", "5%", "10%"] data = macrodata.load_pandas() - x = data.data['realgdp'].values - y = data.data['infl'].values + x = data.data["realgdp"].values + y = data.data["infl"].values def test_teststat(self): assert_almost_equal(self.res1[0], self.teststat, DECIMAL_5) @@ -72,23 +91,20 @@ class TestADFConstant(CheckADF): @classmethod def setup_class(cls): - cls.res1 = adfuller(cls.x, regression="c", autolag=None, - maxlag=4) - cls.teststat = .97505319 - cls.pvalue = .99399563 + cls.res1 = adfuller(cls.x, regression="c", autolag=None, maxlag=4) + cls.teststat = 0.97505319 + cls.pvalue = 0.99399563 cls.critvalues = [-3.476, -2.883, -2.573] class TestADFConstantTrend(CheckADF): - """ - """ + """""" @classmethod def setup_class(cls): - cls.res1 = adfuller(cls.x, regression="ct", autolag=None, - maxlag=4) + cls.res1 = adfuller(cls.x, regression="ct", autolag=None, maxlag=4) cls.teststat = -1.8566374 - cls.pvalue = .67682968 + cls.pvalue = 0.67682968 cls.critvalues = [-4.007, -3.437, -3.137] @@ -101,16 +117,14 @@ def setup_class(cls): class TestADFNoConstant(CheckADF): - """ - """ + """""" @classmethod def setup_class(cls): - cls.res1 = adfuller(cls.x, regression="nc", autolag=None, - maxlag=4) + cls.res1 = adfuller(cls.x, regression="nc", autolag=None, maxlag=4) cls.teststat = 3.5227498 - cls.pvalue = .99999 + cls.pvalue = 0.99999 # Stata does not return a p-value for noconstant. # Tau^max in MacKinnon (1994) is missing, so it is # assumed that its right-tail is well-behaved @@ -120,51 +134,53 @@ def setup_class(cls): # No Unit Root + class TestADFConstant2(CheckADF): @classmethod def setup_class(cls): - cls.res1 = adfuller(cls.y, regression="c", autolag=None, - maxlag=1) + cls.res1 = adfuller(cls.y, regression="c", autolag=None, maxlag=1) cls.teststat = -4.3346988 - cls.pvalue = .00038661 + cls.pvalue = 0.00038661 cls.critvalues = [-3.476, -2.883, -2.573] class TestADFConstantTrend2(CheckADF): @classmethod def setup_class(cls): - cls.res1 = adfuller(cls.y, regression="ct", autolag=None, - maxlag=1) + cls.res1 = adfuller(cls.y, regression="ct", autolag=None, maxlag=1) cls.teststat = -4.425093 - cls.pvalue = .00199633 + cls.pvalue = 0.00199633 cls.critvalues = [-4.006, -3.437, -3.137] class TestADFNoConstant2(CheckADF): @classmethod def setup_class(cls): - cls.res1 = adfuller(cls.y, regression="nc", autolag=None, - maxlag=1) + cls.res1 = adfuller(cls.y, regression="nc", autolag=None, maxlag=1) cls.teststat = -2.4511596 cls.pvalue = 0.013747 # Stata does not return a p-value for noconstant # this value is just taken from our results cls.critvalues = [-2.587, -1.950, -1.617] - _, _1, _2, cls.store = adfuller(cls.y, regression="nc", autolag=None, - maxlag=1, store=True) + _, _1, _2, cls.store = adfuller( + cls.y, regression="nc", autolag=None, maxlag=1, store=True + ) def test_store_str(self): - assert_equal(self.store.__str__(), 'Augmented Dickey-Fuller Test Results') + assert_equal( + self.store.__str__(), "Augmented Dickey-Fuller Test Results" + ) class CheckCorrGram(object): """ Set up for ACF, PACF tests. """ + data = macrodata.load_pandas() - x = data.data['realgdp'] - filename = os.path.join(CURR_DIR, 'results', 'results_corrgram.csv') - results = pd.read_csv(filename, delimiter=',') + x = data.data["realgdp"] + filename = os.path.join(CURR_DIR, "results", "results_corrgram.csv") + results = pd.read_csv(filename, delimiter=",") class TestACF(CheckCorrGram): @@ -174,11 +190,11 @@ class TestACF(CheckCorrGram): @classmethod def setup_class(cls): - cls.acf = cls.results['acvar'] + cls.acf = cls.results["acvar"] # cls.acf = np.concatenate(([1.], cls.acf)) - cls.qstat = cls.results['Q1'] - cls.res1 = acf(cls.x, nlags=40, qstat=True, alpha=.05, fft=False) - cls.confint_res = cls.results[['acvar_lb', 'acvar_ub']].values + cls.qstat = cls.results["Q1"] + cls.res1 = acf(cls.x, nlags=40, qstat=True, alpha=0.05, fft=False) + cls.confint_res = cls.results[["acvar_lb", "acvar_ub"]].values def test_acf(self): assert_almost_equal(self.res1[0][1:41], self.acf, DECIMAL_8) @@ -201,8 +217,8 @@ class TestACF_FFT(CheckCorrGram): # Test Autocorrelation Function using FFT @classmethod def setup_class(cls): - cls.acf = cls.results['acvarfft'] - cls.qstat = cls.results['Q1'] + cls.acf = cls.results["acvarfft"] + cls.qstat = cls.results["Q1"] cls.res1 = acf(cls.x, nlags=40, qstat=True, fft=True) def test_acf(self): @@ -218,21 +234,35 @@ class TestACFMissing(CheckCorrGram): @classmethod def setup_class(cls): cls.x = np.concatenate((np.array([np.nan]), cls.x)) - cls.acf = cls.results['acvar'] # drop and conservative - cls.qstat = cls.results['Q1'] - cls.res_drop = acf(cls.x, nlags=40, qstat=True, alpha=.05, - missing='drop', fft=False) - cls.res_conservative = acf(cls.x, nlags=40, qstat=True, alpha=.05, - fft=False, missing='conservative') + cls.acf = cls.results["acvar"] # drop and conservative + cls.qstat = cls.results["Q1"] + cls.res_drop = acf( + cls.x, nlags=40, qstat=True, alpha=0.05, missing="drop", fft=False + ) + cls.res_conservative = acf( + cls.x, + nlags=40, + qstat=True, + alpha=0.05, + fft=False, + missing="conservative", + ) cls.acf_none = np.empty(40) * np.nan # lags 1 to 40 inclusive cls.qstat_none = np.empty(40) * np.nan - cls.res_none = acf(cls.x, nlags=40, qstat=True, alpha=.05, - missing='none', fft=False) + cls.res_none = acf( + cls.x, nlags=40, qstat=True, alpha=0.05, missing="none", fft=False + ) def test_raise(self): with pytest.raises(MissingDataError): - acf(self.x, nlags=40, qstat=True, fft=False, alpha=.05, - missing='raise') + acf( + self.x, + nlags=40, + qstat=True, + fft=False, + alpha=0.05, + missing="raise", + ) def test_acf_none(self): assert_almost_equal(self.res_none[0][1:41], self.acf_none, DECIMAL_8) @@ -241,8 +271,9 @@ def test_acf_drop(self): assert_almost_equal(self.res_drop[0][1:41], self.acf, DECIMAL_8) def test_acf_conservative(self): - assert_almost_equal(self.res_conservative[0][1:41], self.acf, - DECIMAL_8) + assert_almost_equal( + self.res_conservative[0][1:41], self.acf, DECIMAL_8 + ) def test_qstat_none(self): # todo why is res1/qstat 1 short @@ -259,18 +290,18 @@ def test_qstat_none(self): class TestPACF(CheckCorrGram): @classmethod def setup_class(cls): - cls.pacfols = cls.results['PACOLS'] - cls.pacfyw = cls.results['PACYW'] + cls.pacfols = cls.results["PACOLS"] + cls.pacfyw = cls.results["PACYW"] def test_ols(self): - pacfols, confint = pacf(self.x, nlags=40, alpha=.05, method="ols") + pacfols, confint = pacf(self.x, nlags=40, alpha=0.05, method="ols") assert_almost_equal(pacfols[1:], self.pacfols, DECIMAL_6) centered = confint - confint.mean(1)[:, None] # from edited Stata ado file - res = [[-.1375625, .1375625]] * 40 + res = [[-0.1375625, 0.1375625]] * 40 assert_almost_equal(centered[1:41], res, DECIMAL_6) # check lag 0 - assert_equal(centered[0], [0., 0.]) + assert_equal(centered[0], [0.0, 0.0]) assert_equal(confint[0], [1, 1]) assert_equal(pacfols[0], 1) @@ -285,8 +316,8 @@ def test_ols_inefficient(self): direct = np.empty(lag_len + 1) direct[0] = 1.0 for i in range(lag_len): - lags[:, i] = x[5 - (i + 1):-(i + 1)] - direct[i + 1] = lstsq(lags[:, :(i + 1)], lead, rcond=None)[0][-1] + lags[:, i] = x[5 - (i + 1) : -(i + 1)] + direct[i + 1] = lstsq(lags[:, : (i + 1)], lead, rcond=None)[0][-1] assert_allclose(pacfols, direct, atol=1e-8) def test_yw(self): @@ -309,10 +340,11 @@ class CheckCoint(object): Test values taken from Stata """ - levels = ['1%', '5%', '10%'] + + levels = ["1%", "5%", "10%"] data = macrodata.load_pandas() - y1 = data.data['realcons'].values - y2 = data.data['realgdp'].values + y1 = data.data["realcons"].values + y2 = data.data["realgdp"].values def test_tstat(self): assert_almost_equal(self.coint_t, self.teststat, DECIMAL_4) @@ -327,7 +359,9 @@ class TestCoint_t(CheckCoint): @classmethod def setup_class(cls): # cls.coint_t = coint(cls.y1, cls.y2, trend="c")[0] - cls.coint_t = coint(cls.y1, cls.y2, trend="c", maxlag=0, autolag=None)[0] + cls.coint_t = coint(cls.y1, cls.y2, trend="c", maxlag=0, autolag=None)[ + 0 + ] cls.teststat = -1.8208817 cls.teststat = -1.830170986148 @@ -345,7 +379,7 @@ def test_coint(): # FIXME: enable/xfail/skip or delete for trend in []: # ['c', 'ct', 'ctt', 'nc']: - print('\n', trend) + print("\n", trend) print(coint(y[:, 0], y[:, 1], trend=trend, maxlag=4, autolag=None)) print(coint(y[:, 0], y[:, 1:3], trend=trend, maxlag=4, autolag=None)) print(coint(y[:, 0], y[:, 2:], trend=trend, maxlag=4, autolag=None)) @@ -354,44 +388,90 @@ def test_coint(): # results from Stata egranger res_egranger = {} # trend = 'ct' - res = res_egranger['ct'] = {} - res[0] = [-5.615251442239, -4.406102369132, -3.82866685109, -3.532082997903] - res[1] = [-5.63591313706, -4.758609717199, -4.179130554708, -3.880909696863] - res[2] = [-2.892029275027, -4.758609717199, -4.179130554708, -3.880909696863] + res = res_egranger["ct"] = {} + res[0] = [ + -5.615251442239, + -4.406102369132, + -3.82866685109, + -3.532082997903, + ] + res[1] = [ + -5.63591313706, + -4.758609717199, + -4.179130554708, + -3.880909696863, + ] + res[2] = [ + -2.892029275027, + -4.758609717199, + -4.179130554708, + -3.880909696863, + ] res[3] = [-5.626932544079, -5.08363327039, -4.502469783057, -4.2031051091] # trend = 'c' - res = res_egranger['c'] = {} + res = res_egranger["c"] = {} # first critical value res[0][1] has a discrepancy starting at 4th decimal - res[0] = [-5.760696844656, -3.952043522638, -3.367006313729, -3.065831247948] + res[0] = [ + -5.760696844656, + -3.952043522638, + -3.367006313729, + -3.065831247948, + ] # manually adjusted to have higher precision as in other cases res[0][1] = -3.952321293401682 - res[1] = [-5.781087068772, -4.367111915942, -3.783961136005, -3.483501524709] - res[2] = [-2.477444137366, -4.367111915942, -3.783961136005, -3.483501524709] - res[3] = [-5.778205811661, -4.735249216434, -4.152738973763, -3.852480848968] + res[1] = [ + -5.781087068772, + -4.367111915942, + -3.783961136005, + -3.483501524709, + ] + res[2] = [ + -2.477444137366, + -4.367111915942, + -3.783961136005, + -3.483501524709, + ] + res[3] = [ + -5.778205811661, + -4.735249216434, + -4.152738973763, + -3.852480848968, + ] # trend = 'ctt' - res = res_egranger['ctt'] = {} - res[0] = [-5.644431269946, -4.796038299708, -4.221469431008, -3.926472577178] + res = res_egranger["ctt"] = {} + res[0] = [ + -5.644431269946, + -4.796038299708, + -4.221469431008, + -3.926472577178, + ] res[1] = [-5.665691609506, -5.111158174219, -4.53317278104, -4.23601008516] res[2] = [-3.161462374828, -5.111158174219, -4.53317278104, -4.23601008516] - res[3] = [-5.657904558563, -5.406880189412, -4.826111619543, -4.527090164875] + res[3] = [ + -5.657904558563, + -5.406880189412, + -4.826111619543, + -4.527090164875, + ] # The following for 'nc' are only regression test numbers # trend = 'nc' not allowed in egranger # trend = 'nc' - res = res_egranger['nc'] = {} + res = res_egranger["nc"] = {} nan = np.nan # shortcut for table res[0] = [-3.7146175989071137, nan, nan, nan] res[1] = [-3.8199323012888384, nan, nan, nan] res[2] = [-1.6865000791270679, nan, nan, nan] res[3] = [-3.7991270451873675, nan, nan, nan] - for trend in ['c', 'ct', 'ctt', 'nc']: + for trend in ["c", "ct", "ctt", "nc"]: res1 = {} res1[0] = coint(y[:, 0], y[:, 1], trend=trend, maxlag=4, autolag=None) - res1[1] = coint(y[:, 0], y[:, 1:3], trend=trend, maxlag=4, - autolag=None) + res1[1] = coint( + y[:, 0], y[:, 1:3], trend=trend, maxlag=4, autolag=None + ) res1[2] = coint(y[:, 0], y[:, 2:], trend=trend, maxlag=4, autolag=None) res1[3] = coint(y[:, 0], y[:, 1:], trend=trend, maxlag=4, autolag=None) @@ -404,11 +484,15 @@ def test_coint(): assert_allclose(r1, r2, rtol=0, atol=6e-7) # use default autolag #4490 - res1_0 = coint(y[:, 0], y[:, 1], trend='ct', maxlag=4) - assert_allclose(res1_0[2], res_egranger['ct'][0][1:], rtol=0, atol=6e-7) + res1_0 = coint(y[:, 0], y[:, 1], trend="ct", maxlag=4) + assert_allclose(res1_0[2], res_egranger["ct"][0][1:], rtol=0, atol=6e-7) # the following is just a regression test - assert_allclose(res1_0[:2], [-13.992946638547112, 2.270898990540678e-27], - rtol=1e-10, atol=1e-27) + assert_allclose( + res1_0[:2], + [-13.992946638547112, 2.270898990540678e-27], + rtol=1e-10, + atol=1e-27, + ) def test_coint_identical_series(): @@ -416,7 +500,7 @@ def test_coint_identical_series(): scale_e = 1 np.random.seed(123) y = scale_e * np.random.randn(nobs) - warnings.simplefilter('always', CollinearityWarning) + warnings.simplefilter("always", CollinearityWarning) with pytest.warns(CollinearityWarning): c = coint(y, y, trend="c", maxlag=0, autolag=None) assert_equal(c[1], 0.0) @@ -430,7 +514,7 @@ def test_coint_perfect_collinearity(): np.random.seed(123) x = scale_e * np.random.randn(nobs, 2) y = 1 + x.sum(axis=1) + 1e-7 * np.random.randn(nobs) - warnings.simplefilter('always', CollinearityWarning) + warnings.simplefilter("always", CollinearityWarning) with warnings.catch_warnings(record=True) as w: c = coint(y, x, trend="c", maxlag=0, autolag=None) assert_equal(c[1], 0.0) @@ -438,34 +522,36 @@ def test_coint_perfect_collinearity(): class TestGrangerCausality(object): - def test_grangercausality(self): # some example data mdata = macrodata.load_pandas().data - mdata = mdata[['realgdp', 'realcons']].values + mdata = mdata[["realgdp", "realcons"]].values data = mdata.astype(float) data = np.diff(np.log(data), axis=0) # R: lmtest:grangertest r_result = [0.243097, 0.7844328, 195, 2] # f_test gr = grangercausalitytests(data[:, 1::-1], 2, verbose=False) - assert_almost_equal(r_result, gr[2][0]['ssr_ftest'], decimal=7) - assert_almost_equal(gr[2][0]['params_ftest'], gr[2][0]['ssr_ftest'], - decimal=7) + assert_almost_equal(r_result, gr[2][0]["ssr_ftest"], decimal=7) + assert_almost_equal( + gr[2][0]["params_ftest"], gr[2][0]["ssr_ftest"], decimal=7 + ) def test_grangercausality_single(self): mdata = macrodata.load_pandas().data - mdata = mdata[['realgdp', 'realcons']].values + mdata = mdata[["realgdp", "realcons"]].values data = mdata.astype(float) data = np.diff(np.log(data), axis=0) gr = grangercausalitytests(data[:, 1::-1], 2, verbose=False) gr2 = grangercausalitytests(data[:, 1::-1], [2], verbose=False) assert 1 in gr assert 1 not in gr2 - assert_almost_equal(gr[2][0]['ssr_ftest'], gr2[2][0]['ssr_ftest'], - decimal=7) - assert_almost_equal(gr[2][0]['params_ftest'], gr2[2][0]['ssr_ftest'], - decimal=7) + assert_almost_equal( + gr[2][0]["ssr_ftest"], gr2[2][0]["ssr_ftest"], decimal=7 + ) + assert_almost_equal( + gr[2][0]["params_ftest"], gr2[2][0]["ssr_ftest"], decimal=7 + ) def test_granger_fails_on_nobs_check(self, reset_randomstate): # Test that if maxlag is too large, Granger Test raises a clear error. @@ -484,8 +570,9 @@ def test_granger_fails_on_finite_check(self, reset_randomstate): def test_granger_fails_on_zero_lag(self, reset_randomstate): x = np.random.rand(1000, 2) with pytest.raises( - ValueError, - match="maxlag must be a non-empty list containing only positive integers"): + ValueError, + match="maxlag must be a non-empty list containing only positive integers", + ): grangercausalitytests(x, [0, 1, 2]) @@ -500,15 +587,16 @@ class TestKPSS: In this context, x is the vector containing the macrodata['realgdp'] series. """ + @classmethod def setup(cls): cls.data = macrodata.load_pandas() - cls.x = cls.data.data['realgdp'].values + cls.x = cls.data.data["realgdp"].values def test_fail_nonvector_input(self, reset_randomstate): # should be fine with pytest.warns(InterpolationWarning): - kpss(self.x, nlags='legacy') + kpss(self.x, nlags="legacy") x = np.random.rand(20, 2) assert_raises(ValueError, kpss, x) @@ -516,38 +604,39 @@ def test_fail_nonvector_input(self, reset_randomstate): def test_fail_unclear_hypothesis(self): # these should be fine, with pytest.warns(InterpolationWarning): - kpss(self.x, 'c', nlags='legacy') + kpss(self.x, "c", nlags="legacy") with pytest.warns(InterpolationWarning): - kpss(self.x, 'C', nlags='legacy') + kpss(self.x, "C", nlags="legacy") with pytest.warns(InterpolationWarning): - kpss(self.x, 'ct', nlags='legacy') + kpss(self.x, "ct", nlags="legacy") with pytest.warns(InterpolationWarning): - kpss(self.x, 'CT', nlags='legacy') + kpss(self.x, "CT", nlags="legacy") - assert_raises(ValueError, kpss, self.x, "unclear hypothesis", - nlags='legacy') + assert_raises( + ValueError, kpss, self.x, "unclear hypothesis", nlags="legacy" + ) def test_teststat(self): with pytest.warns(InterpolationWarning): - kpss_stat, pval, lags, crits = kpss(self.x, 'c', 3) + kpss_stat, pval, lags, crits = kpss(self.x, "c", 3) assert_almost_equal(kpss_stat, 5.0169, DECIMAL_3) with pytest.warns(InterpolationWarning): - kpss_stat, pval, lags, crits = kpss(self.x, 'ct', 3) + kpss_stat, pval, lags, crits = kpss(self.x, "ct", 3) assert_almost_equal(kpss_stat, 1.1828, DECIMAL_3) def test_pval(self): with pytest.warns(InterpolationWarning): - kpss_stat, pval, lags, crits = kpss(self.x, 'c', 3) + kpss_stat, pval, lags, crits = kpss(self.x, "c", 3) assert_equal(pval, 0.01) with pytest.warns(InterpolationWarning): - kpss_stat, pval, lags, crits = kpss(self.x, 'ct', 3) + kpss_stat, pval, lags, crits = kpss(self.x, "ct", 3) assert_equal(pval, 0.01) def test_store(self): with pytest.warns(InterpolationWarning): - kpss_stat, pval, crit, store = kpss(self.x, 'c', 3, True) + kpss_stat, pval, crit, store = kpss(self.x, "c", 3, True) # assert attributes, and make sure they're correct assert_equal(store.nobs, len(self.x)) @@ -557,22 +646,22 @@ def test_store(self): def test_lags(self): # real GDP from macrodata data set with pytest.warns(InterpolationWarning): - res = kpss(self.x, 'c', nlags='auto') + res = kpss(self.x, "c", nlags="auto") assert_equal(res[2], 9) # real interest rates from macrodata data set - res = kpss(sunspots.load(True).data['SUNACTIVITY'], 'c', nlags='auto') + res = kpss(sunspots.load(True).data["SUNACTIVITY"], "c", nlags="auto") assert_equal(res[2], 7) # volumes from nile data set with pytest.warns(InterpolationWarning): - res = kpss(nile.load(True).data['volume'], 'c', nlags='auto') + res = kpss(nile.load(True).data["volume"], "c", nlags="auto") assert_equal(res[2], 5) # log-coinsurance from randhie data set with pytest.warns(InterpolationWarning): - res = kpss(randhie.load(True).data['lncoins'], 'ct', nlags='auto') + res = kpss(randhie.load(True).data["lncoins"], "ct", nlags="auto") assert_equal(res[2], 75) # in-vehicle time from modechoice data set with pytest.warns(InterpolationWarning): - res = kpss(modechoice.load(True).data['invt'], 'ct', nlags='auto') + res = kpss(modechoice.load(True).data["invt"], "ct", nlags="auto") assert_equal(res[2], 18) def test_kpss_fails_on_nobs_check(self): @@ -580,10 +669,11 @@ def test_kpss_fails_on_nobs_check(self): # clear error # GH5925 nobs = len(self.x) - msg = (r"lags \({}\) must be < number of observations \({}\)" - .format(nobs, nobs)) + msg = r"lags \({}\) must be < number of observations \({}\)".format( + nobs, nobs + ) with pytest.raises(ValueError, match=msg): - kpss(self.x, 'c', nlags=nobs) + kpss(self.x, "c", nlags=nobs) def test_kpss_autolags_does_not_assign_lags_equal_to_nobs(self): # Test that if *autolags* exceeds number of observations, we set @@ -610,17 +700,17 @@ def test_kpss_autolags_does_not_assign_lags_equal_to_nobs(self): def test_legacy_lags(self): # Test legacy lags are the same with pytest.warns(InterpolationWarning): - res = kpss(self.x, 'c', nlags='legacy') + res = kpss(self.x, "c", nlags="legacy") assert_equal(res[2], 15) def test_unknown_lags(self): # Test legacy lags are the same with pytest.raises(ValueError): - kpss(self.x, 'c', nlags='unknown') + kpss(self.x, "c", nlags="unknown") def test_deprecation(self): with pytest.warns(FutureWarning): - kpss(self.x, 'c') + kpss(self.x, "c") def test_pandasacovf(): @@ -630,7 +720,7 @@ def test_pandasacovf(): def test_acovf2d(reset_randomstate): dta = sunspots.load_pandas().data - dta.index = date_range(start='1700', end='2009', freq='A')[:309] + dta.index = date_range(start="1700", end="2009", freq="A")[:309] del dta["YEAR"] res = acovf(dta, fft=False) assert_equal(res, acovf(dta.values, fft=False)) @@ -639,8 +729,8 @@ def test_acovf2d(reset_randomstate): acovf(x, fft=False) -@pytest.mark.parametrize('demean', [True, False]) -@pytest.mark.parametrize('adjusted', [True, False]) +@pytest.mark.parametrize("demean", [True, False]) +@pytest.mark.parametrize("adjusted", [True, False]) def test_acovf_fft_vs_convolution(demean, adjusted): np.random.seed(1) q = np.random.normal(size=100) @@ -656,25 +746,33 @@ def test_arma_order_select_ic(): # smoke test, assumes info-criteria are right from statsmodels.tsa.arima_process import arma_generate_sample - arparams = np.array([.75, -.25]) - maparams = np.array([.65, .35]) + arparams = np.array([0.75, -0.25]) + maparams = np.array([0.65, 0.35]) arparams = np.r_[1, -arparams] maparam = np.r_[1, maparams] nobs = 250 np.random.seed(2014) y = arma_generate_sample(arparams, maparams, nobs) - res = arma_order_select_ic(y, ic=['aic', 'bic'], trend='nc') + res = arma_order_select_ic(y, ic=["aic", "bic"], trend="nc") # regression tests in case we change algorithm to minic in sas - aic_x = np.array([[np.nan, 552.7342255, 484.29687843], - [562.10924262, 485.5197969, 480.32858497], - [507.04581344, 482.91065829, 481.91926034], - [484.03995962, 482.14868032, 483.86378955], - [481.8849479, 483.8377379, 485.83756612]]) - bic_x = np.array([[np.nan, 559.77714733, 494.86126118], - [569.15216446, 496.08417966, 494.41442864], - [517.61019619, 496.99650196, 499.52656493], - [498.12580329, 499.75598491, 504.99255506], - [499.49225249, 504.96650341, 510.48779255]]) + aic_x = np.array( + [ + [np.nan, 552.7342255, 484.29687843], + [562.10924262, 485.5197969, 480.32858497], + [507.04581344, 482.91065829, 481.91926034], + [484.03995962, 482.14868032, 483.86378955], + [481.8849479, 483.8377379, 485.83756612], + ] + ) + bic_x = np.array( + [ + [np.nan, 559.77714733, 494.86126118], + [569.15216446, 496.08417966, 494.41442864], + [517.61019619, 496.99650196, 499.52656493], + [498.12580329, 499.75598491, 504.99255506], + [499.49225249, 504.96650341, 510.48779255], + ] + ) aic = DataFrame(aic_x, index=lrange(5), columns=lrange(3)) bic = DataFrame(bic_x, index=lrange(5), columns=lrange(3)) assert_almost_equal(res.aic.values, aic.values, 5) @@ -686,16 +784,17 @@ def test_arma_order_select_ic(): assert_(res.bic.index.equals(bic.index)) assert_(res.bic.columns.equals(bic.columns)) - index = pd.date_range('2000-1-1', freq='M', periods=len(y)) + index = pd.date_range("2000-1-1", freq="M", periods=len(y)) y_series = pd.Series(y, index=index) - res_pd = arma_order_select_ic(y_series, max_ar=2, max_ma=1, - ic=['aic', 'bic'], trend='nc') + res_pd = arma_order_select_ic( + y_series, max_ar=2, max_ma=1, ic=["aic", "bic"], trend="nc" + ) assert_almost_equal(res_pd.aic.values, aic.values[:3, :2], 5) assert_almost_equal(res_pd.bic.values, bic.values[:3, :2], 5) assert_equal(res_pd.aic_min_order, (2, 1)) assert_equal(res_pd.bic_min_order, (1, 1)) - res = arma_order_select_ic(y, ic='aic', trend='nc') + res = arma_order_select_ic(y, ic="aic", trend="nc") assert_almost_equal(res.aic.values, aic.values, 5) assert_(res.aic.index.equals(aic.index)) assert_(res.aic.columns.equals(aic.columns)) @@ -706,17 +805,32 @@ def test_arma_order_select_ic_failure(): # this should trigger an SVD convergence failure, smoke test that it # returns, likely platform dependent failure... # looks like AR roots may be cancelling out for 4, 1? - y = np.array([0.86074377817203640006, 0.85316549067906921611, - 0.87104653774363305363, 0.60692382068987393851, - 0.69225941967301307667, 0.73336177248909339976, - 0.03661329261479619179, 0.15693067239962379955, - 0.12777403512447857437, -0.27531446294481976, - -0.24198139631653581283, -0.23903317951236391359, - -0.26000241325906497947, -0.21282920015519238288, - -0.15943768324388354896, 0.25169301564268781179, - 0.1762305709151877342, 0.12678133368791388857, - 0.89755829086753169399, 0.82667068795350151511]) + y = np.array( + [ + 0.86074377817203640006, + 0.85316549067906921611, + 0.87104653774363305363, + 0.60692382068987393851, + 0.69225941967301307667, + 0.73336177248909339976, + 0.03661329261479619179, + 0.15693067239962379955, + 0.12777403512447857437, + -0.27531446294481976, + -0.24198139631653581283, + -0.23903317951236391359, + -0.26000241325906497947, + -0.21282920015519238288, + -0.15943768324388354896, + 0.25169301564268781179, + 0.1762305709151877342, + 0.12678133368791388857, + 0.89755829086753169399, + 0.82667068795350151511, + ] + ) import warnings + with warnings.catch_warnings(): # catch a hessian inversion and convergence failure warning warnings.simplefilter("ignore") @@ -726,7 +840,9 @@ def test_arma_order_select_ic_failure(): def test_acf_fft_dataframe(): # regression test #322 - result = acf(sunspots.load_pandas().data[['SUNACTIVITY']], fft=True, nlags=20) + result = acf( + sunspots.load_pandas().data[["SUNACTIVITY"]], fft=True, nlags=20 + ) assert_equal(result.ndim, 1) @@ -740,29 +856,43 @@ def test_levinson_durbin_acov(): assert_allclose(pacf, np.array([1, rho] + [0] * (m - 1)), atol=1e-8) -@pytest.mark.parametrize("missing", ['conservative', 'drop', 'raise', 'none']) +@pytest.mark.parametrize("missing", ["conservative", "drop", "raise", "none"]) @pytest.mark.parametrize("fft", [False, True]) @pytest.mark.parametrize("demean", [True, False]) @pytest.mark.parametrize("adjusted", [True, False]) def test_acovf_nlags(acovf_data, adjusted, demean, fft, missing): - full = acovf(acovf_data, adjusted=adjusted, demean=demean, fft=fft, - missing=missing) - limited = acovf(acovf_data, adjusted=adjusted, demean=demean, fft=fft, - missing=missing, nlag=10) + full = acovf( + acovf_data, adjusted=adjusted, demean=demean, fft=fft, missing=missing + ) + limited = acovf( + acovf_data, + adjusted=adjusted, + demean=demean, + fft=fft, + missing=missing, + nlag=10, + ) assert_allclose(full[:11], limited) -@pytest.mark.parametrize("missing", ['conservative', 'drop']) +@pytest.mark.parametrize("missing", ["conservative", "drop"]) @pytest.mark.parametrize("fft", [False, True]) @pytest.mark.parametrize("demean", [True, False]) @pytest.mark.parametrize("adjusted", [True, False]) def test_acovf_nlags_missing(acovf_data, adjusted, demean, fft, missing): acovf_data = acovf_data.copy() acovf_data[1:3] = np.nan - full = acovf(acovf_data, adjusted=adjusted, demean=demean, fft=fft, - missing=missing) - limited = acovf(acovf_data, adjusted=adjusted, demean=demean, fft=fft, - missing=missing, nlag=10) + full = acovf( + acovf_data, adjusted=adjusted, demean=demean, fft=fft, missing=missing + ) + limited = acovf( + acovf_data, + adjusted=adjusted, + demean=demean, + fft=fft, + missing=missing, + nlag=10, + ) assert_allclose(full[:11], limited) @@ -786,16 +916,16 @@ def test_pacf2acf_ar(): pacf[0] = 1 pacf[1] = 0.9 ar, acf = levinson_durbin_pacf(pacf) - assert_allclose(acf, 0.9 ** np.arange(10.)) + assert_allclose(acf, 0.9 ** np.arange(10.0)) assert_allclose(ar, pacf[1:], atol=1e-8) ar, acf = levinson_durbin_pacf(pacf, nlags=5) - assert_allclose(acf, 0.9 ** np.arange(6.)) + assert_allclose(acf, 0.9 ** np.arange(6.0)) assert_allclose(ar, pacf[1:6], atol=1e-8) def test_pacf2acf_levinson_durbin(): - pacf = -0.9 ** np.arange(11.) + pacf = -(0.9 ** np.arange(11.0)) pacf[0] = 1 ar, acf = levinson_durbin_pacf(pacf) _, ar_ld, pacf_ld, _, _ = levinson_durbin(acf, 10, isacov=True) @@ -803,13 +933,23 @@ def test_pacf2acf_levinson_durbin(): assert_allclose(pacf, pacf_ld, atol=1e-8) # From R, FitAR, PacfToAR - ar_from_r = [-4.1609, -9.2549, -14.4826, -17.6505, -17.5012, -14.2969, -9.5020, -4.9184, - -1.7911, -0.3486] + ar_from_r = [ + -4.1609, + -9.2549, + -14.4826, + -17.6505, + -17.5012, + -14.2969, + -9.5020, + -4.9184, + -1.7911, + -0.3486, + ] assert_allclose(ar, ar_from_r, atol=1e-4) def test_pacf2acf_errors(): - pacf = -0.9 ** np.arange(11.) + pacf = -(0.9 ** np.arange(11.0)) pacf[0] = 1 with pytest.raises(ValueError): levinson_durbin_pacf(pacf, nlags=20) @@ -847,7 +987,7 @@ def test_innovations_algo_brockwell_davis(): ma = -0.9 acovf = np.array([1 + ma ** 2, ma]) theta, sigma2 = innovations_algo(acovf, nobs=4) - exp_theta = np.array([[0], [-.4972], [-.6606], [-.7404]]) + exp_theta = np.array([[0], [-0.4972], [-0.6606], [-0.7404]]) assert_allclose(theta, exp_theta, rtol=1e-4) assert_allclose(sigma2, [1.81, 1.3625, 1.2155, 1.1436], rtol=1e-4) @@ -875,7 +1015,7 @@ def test_innovations_errors(): with pytest.raises(ValueError): innovations_algo(np.empty((2, 2))) with pytest.raises(TypeError): - innovations_algo(acovf, rtol='none') + innovations_algo(acovf, rtol="none") def test_innovations_filter_brockwell_davis(reset_randomstate): @@ -897,8 +1037,7 @@ def test_innovations_filter_pandas(reset_randomstate): acovf = np.array([1 + (ma ** 2).sum(), ma[0] + ma[1] * ma[0], ma[1]]) theta, _ = innovations_algo(acovf, nobs=10) endog = np.random.randn(10) - endog_pd = pd.Series(endog, - index=pd.date_range('2000-01-01', periods=10)) + endog_pd = pd.Series(endog, index=pd.date_range("2000-01-01", periods=10)) resid = innovations_filter(endog, theta) resid_pd = innovations_filter(endog_pd, theta) assert_allclose(resid, resid_pd.values) @@ -930,8 +1069,9 @@ def test_innovations_algo_filter_kalman_filter(reset_randomstate): endog = np.random.normal(size=10) # Innovations algorithm approach - acovf = arma_acovf(np.r_[1, -ar_params], np.r_[1, ma_params], - nobs=len(endog)) + acovf = arma_acovf( + np.r_[1, -ar_params], np.r_[1, ma_params], nobs=len(endog) + ) theta, v = innovations_algo(acovf) u = innovations_filter(endog, theta) @@ -944,8 +1084,9 @@ def test_innovations_algo_filter_kalman_filter(reset_randomstate): # Test that the two approaches are identical atol = 1e-6 if PLATFORM_WIN else 0.0 assert_allclose(u, res.forecasts_error[0], rtol=1e-6, atol=atol) - assert_allclose(theta[1:, 0], res.filter_results.kalman_gain[0, 0, :-1], - atol=atol) + assert_allclose( + theta[1:, 0], res.filter_results.kalman_gain[0, 0, :-1], atol=atol + ) assert_allclose(llf_obs, res.llf_obs, atol=atol) @@ -954,25 +1095,25 @@ def test_adfuller_short_series(reset_randomstate): res = adfuller(y, store=True) assert res[-1].maxlag == 1 y = np.random.standard_normal(2) - with pytest.raises(ValueError, match='sample size is too short'): + with pytest.raises(ValueError, match="sample size is too short"): adfuller(y) y = np.random.standard_normal(3) - with pytest.raises(ValueError, match='sample size is too short'): - adfuller(y, regression='ct') + with pytest.raises(ValueError, match="sample size is too short"): + adfuller(y, regression="ct") def test_adfuller_maxlag_too_large(reset_randomstate): y = np.random.standard_normal(100) - with pytest.raises(ValueError, match='maxlag must be less than'): + with pytest.raises(ValueError, match="maxlag must be less than"): adfuller(y, maxlag=51) class SetupZivotAndrews(object): # test directory cur_dir = CURR_DIR - run_dir = os.path.join(cur_dir, 'results') + run_dir = os.path.join(cur_dir, "results") # use same file for testing failure modes - fail_file = os.path.join(run_dir, 'rgnp.csv') + fail_file = os.path.join(run_dir, "rgnp.csv") fail_mdl = np.asarray(pd.read_csv(fail_file)) @@ -981,7 +1122,7 @@ class TestZivotAndrews(SetupZivotAndrews): # failure mode tests def test_fail_regression_type(self): with pytest.raises(ValueError): - zivot_andrews(self.fail_mdl, regression='x') + zivot_andrews(self.fail_mdl, regression="x") def test_fail_trim_value(self): with pytest.raises(ValueError): @@ -993,62 +1134,76 @@ def test_fail_array_shape(self): def test_fail_autolag_type(self): with pytest.raises(ValueError): - zivot_andrews(self.fail_mdl, autolag='None') + zivot_andrews(self.fail_mdl, autolag="None") - @pytest.mark.parametrize('autolag', ["AIC", "aic", "Aic"]) + @pytest.mark.parametrize("autolag", ["AIC", "aic", "Aic"]) def test_autolag_case_sensitivity(self, autolag): res = zivot_andrews(self.fail_mdl, autolag=autolag) assert res[3] == 1 # following tests compare results to R package urca.ur.za (1.13-0) def test_rgnp_case(self): - res = zivot_andrews(self.fail_mdl, maxlag=8, regression='c', - autolag=None) - assert_allclose([res[0], res[1], res[4]], - [-5.57615, 0.00312, 20], rtol=1e-3) + res = zivot_andrews( + self.fail_mdl, maxlag=8, regression="c", autolag=None + ) + assert_allclose( + [res[0], res[1], res[4]], [-5.57615, 0.00312, 20], rtol=1e-3 + ) def test_gnpdef_case(self): - mdlfile = os.path.join(self.run_dir, 'gnpdef.csv') + mdlfile = os.path.join(self.run_dir, "gnpdef.csv") mdl = np.asarray(pd.read_csv(mdlfile)) - res = zivot_andrews(mdl, maxlag=8, regression='c', autolag='t-stat') - assert_allclose([res[0], res[1], res[3], res[4]], - [-4.12155, 0.28024, 5, 40], rtol=1e-3) + res = zivot_andrews(mdl, maxlag=8, regression="c", autolag="t-stat") + assert_allclose( + [res[0], res[1], res[3], res[4]], + [-4.12155, 0.28024, 5, 40], + rtol=1e-3, + ) def test_stkprc_case(self): - mdlfile = os.path.join(self.run_dir, 'stkprc.csv') + mdlfile = os.path.join(self.run_dir, "stkprc.csv") mdl = np.asarray(pd.read_csv(mdlfile)) - res = zivot_andrews(mdl, maxlag=8, regression='ct', autolag='t-stat') - assert_allclose([res[0], res[1], res[3], res[4]], - [-5.60689, 0.00894, 1, 65], rtol=1e-3) + res = zivot_andrews(mdl, maxlag=8, regression="ct", autolag="t-stat") + assert_allclose( + [res[0], res[1], res[3], res[4]], + [-5.60689, 0.00894, 1, 65], + rtol=1e-3, + ) def test_rgnpq_case(self): - mdlfile = os.path.join(self.run_dir, 'rgnpq.csv') + mdlfile = os.path.join(self.run_dir, "rgnpq.csv") mdl = np.asarray(pd.read_csv(mdlfile)) - res = zivot_andrews(mdl, maxlag=12, regression='t', autolag='t-stat') - assert_allclose([res[0], res[1], res[3], res[4]], - [-3.02761, 0.63993, 12, 102], rtol=1e-3) + res = zivot_andrews(mdl, maxlag=12, regression="t", autolag="t-stat") + assert_allclose( + [res[0], res[1], res[3], res[4]], + [-3.02761, 0.63993, 12, 102], + rtol=1e-3, + ) def test_rand10000_case(self): - mdlfile = os.path.join(self.run_dir, 'rand10000.csv') + mdlfile = os.path.join(self.run_dir, "rand10000.csv") mdl = np.asarray(pd.read_csv(mdlfile)) - res = zivot_andrews(mdl, regression='c', autolag='t-stat') - assert_allclose([res[0], res[1], res[3], res[4]], - [-3.48223, 0.69111, 25, 7071], rtol=1e-3) + res = zivot_andrews(mdl, regression="c", autolag="t-stat") + assert_allclose( + [res[0], res[1], res[3], res[4]], + [-3.48223, 0.69111, 25, 7071], + rtol=1e-3, + ) def test_acf_conservate_nanops(reset_randomstate): # GH 6729 e = np.random.standard_normal(100) for i in range(1, e.shape[0]): - e[i] += 0.9 * e[i-1] + e[i] += 0.9 * e[i - 1] e[::7] = np.nan result = acf(e, missing="conservative", nlags=10, fft=False) resid = e - np.nanmean(e) expected = np.ones(11) nobs = e.shape[0] gamma0 = np.nansum(resid * resid) - for i in range(1, 10+1): - expected[i] = np.nansum(resid[i:] * resid[:nobs-i]) / gamma0 + for i in range(1, 10 + 1): + expected[i] = np.nansum(resid[i:] * resid[: nobs - i]) / gamma0 assert_allclose(result, expected, rtol=1e-4, atol=1e-4) @@ -1056,3 +1211,22 @@ def test_pacf_nlags_error(reset_randomstate): e = np.random.standard_normal(100) with pytest.raises(ValueError, match="Can only compute partial"): pacf(e, 50) + + +def test_coint_auto_tstat(): + import numpy as np + from statsmodels.tsa.api import coint + + rs = np.random.RandomState(3733696641) + x = np.cumsum(rs.standard_normal(100)) + y = np.cumsum(rs.standard_normal(100)) + res = coint( + x, + y, + trend="c", + method="aeg", + maxlag=0, + autolag="t-stat", + return_results=False, + ) + assert np.abs(res[0]) < 1.65