In [1]:
from google.colab import drive
drive.mount('/content/drive')

ValueError: mount failed

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller
import numpy as np
from typing import Union
from tqdm.notebook import tqdm
from statsmodels.tsa.statespace.sarimax import SARIMAX
from itertools import product
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.seasonal import STL

import warnings

warnings.filterwarnings("ignore", message="Maximum Likelihood optimization failed to converge")
warnings.filterwarnings("ignore", message="Non-stationary starting autoregressive parameters")
warnings.filterwarnings("ignore", message="Non-invertible starting MA parameters found")


# 8.5.1 존슨앤드존슨 데이터 집합에 SARIMA 모델 적용하기

## 1. 시계열 분해를 사용하여 주기적 패턴의 존재를 식별한다.

In [None]:
df = pd.read_csv('/content/drive/MyDrive/BK21/data/jj.csv')
df.head()

In [None]:
fig, ax = plt.subplots()

ax.plot(df.date, df.data)
ax.set_xlabel('Date')
ax.set_ylabel('Earnings per share (USD)')

plt.xticks(np.arange(0, 81, 8), [1960, 1962, 1964, 1966, 1968, 1970, 1972, 1974, 1976, 1978, 1980])

fig.autofmt_xdate()
plt.tight_layout()

In [None]:
decomposition = STL(df['data'], period=4).fit()

fig, (ax1, ax2, ax3, ax4) = plt.subplots(nrows=4, ncols=1, sharex=True, figsize=(10,8))

ax1.plot(decomposition.observed)
ax1.set_ylabel('Observed')

ax2.plot(decomposition.trend)
ax2.set_ylabel('Trend')

ax3.plot(decomposition.seasonal)
ax3.set_ylabel('Seasonal')

ax4.plot(decomposition.resid)
ax4.set_ylabel('Residual')

plt.xticks(np.arange(0, 81, 8), [1960, 1962, 1964, 1966, 1968, 1970, 1972, 1974, 1976, 1978, 1980])

fig.autofmt_xdate()
plt.tight_layout()

## 2. optimize_SARIMA 함수를 사용하여 AIC가 가장 낮은 모델 선택

In [None]:
ad_fuller_result = adfuller(df['data'])

print(f'ADF Statistic: {ad_fuller_result[0]}')
print(f'p-value: {ad_fuller_result[1]}')

In [None]:
data_diff = np.diff(df['data'], n = 1)

In [None]:
ad_fuller_result = adfuller(data_diff)

print(f'ADF Statistic: {ad_fuller_result[0]}')
print(f'p-value: {ad_fuller_result[1]}')

In [None]:
data_diff_seasonal_diff = np.diff(data_diff, n = 4)

In [None]:
ad_fuller_result = adfuller(data_diff_seasonal_diff)

print(f'ADF Statistic: {ad_fuller_result[0]}')
print(f'p-value: {ad_fuller_result[1]}')

In [None]:
def optimize_SARIMA(endog: Union[pd.Series, list], order_list: list, d: int, D: int, s: int) -> pd.DataFrame:

  results = []

  for order in tqdm(order_list):
    try:
      model = SARIMAX(
          endog,
          order=(order[0], d, order[1]),
          seasonal_order=(order[2], D, order[3], s),
          simple_differencing=False).fit(disp=False)
    except:
      continue

    aic = model.aic
    results.append([order, aic])

  result_df = pd.DataFrame(results)
  result_df.columns = ['SARIMA_order', 'AIC']

  result_df = result_df.sort_values(by='AIC', ascending=True).reset_index(drop=True)
  return result_df


In [None]:
train = df[:-4]
test = df[-4:]

print(len(test))
print(test.head())
print(type(test))

In [None]:
ps = range(0, 4, 1)
qs = range(0, 4, 1)
Ps = range(0, 4, 1)
Qs = range(0, 4, 1)

SARIMA_order_list = list(product(ps, qs, Ps, Qs))


d = 1
D = 1
s = 4

SARIMA_result_df = optimize_SARIMA(train, SARIMA_order_list, d, D, s)
SARIMA_result_df.head()
#SARIMA(3,1,0)(2,1,1)4

## 3. 잔차 분석

In [None]:
SARIMA_model = SARIMAX(train, order=(3,1,0), seasonal_order=(2,1,1,4), simple_differencing=False)
SARIMA_model_fit = SARIMA_model.fit(disp=False)

# Q-Q 도식
SARIMA_model_fit.plot_diagnostics(figsize=(10, 8))
plt.show()

In [None]:
# 융-박스 테스트
residuals = SARIMA_model_fit.resid
pvalue = acorr_ljungbox(residuals, np.arange(1, 11, 1))

print(pvalue["lb_pvalue"].to_numpy())

## 4. 지난 1년간의 주당순이익을 예측하고 ARIMA 모델의 성능 측정, MAPE 사용

In [None]:
SARIMA_pred = SARIMA_model_fit.get_prediction(80, 83).predicted_mean

df_predict = test.copy()

df_predict.loc[:, 'SARIMA_pred'] = SARIMA_pred
df_predict

In [None]:
def mape(y_true, y_pred):
  return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

In [None]:
mape_SARIMA = mape(df_predict['data'], df_predict['SARIMA_pred'])
print(mape_SARIMA)
# ARIMA(3,2,3)의 MAPE 1.73