In [11]:
from hossam import load_data
from pandas import DataFrame, to_datetime
from matplotlib import pyplot as plt
from matplotlib import dates
from matplotlib import font_manager as fm
import seaborn as sb
import numpy as np

from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA

In [12]:
my_dpi = 200                                    # 이미지 선명도(100~300)
font_path = "./NotoSansKR-Regular.ttf"          # 한글을 지원하는 폰트 파일의 경로
fm.fontManager.addfont(font_path)               # 폰트의 글꼴을 시스템에 등록함
font_prop = fm.FontProperties(fname=font_path)  # 폰트의 속성을 읽어옴
font_name = font_prop.get_name()                # 읽어온 속성에서 폰트의 이름만 추출     
plt.rcParams['font.family'] = font_name         # 그래프에 한글 폰트 적용
plt.rcParams['font.size'] = 10                  # 기본 폰트크기
plt.rcParams['axes.unicode_minus'] = False      # 그래프에 마이너스 깨짐 방지(한글환경에서 필수)

In [13]:
def diff(df, yname):
    diff_df = df.copy()
    diff_count=0
    result=[]
    while True:
        ar = adfuller(diff_df[yname])
        ar_dict ={
            "차수" : diff_count,
            "ADF Statistic":ar[0],
            "p-value":ar[1],
            'result':True if ar[1]<=0.05 else False
        }

        result.append(ar_dict)

        if ar_dict['result']:
            return DataFrame(result), diff_df
        diff_count +=1
        diff_df = diff_df.diff().dropna()

def acf_pacf(df, yname):
    diff_df = df.copy()


    #ACF 계신
    acf_vals = acf(diff_df['Passengers'])
    threshold = 2/np.sqrt(len(diff_df['Passengers']))

# 결과표 생성
    df_acf = DataFrame({
        "lag": np.arange(len(acf_vals)),
        "acf":acf_vals,

})

#유의성 판단

    df_acf["abs_acf"] = df_acf["acf"].abs()
    df_acf["significant"] = df_acf["abs_acf"]> threshold



    df_acf["acf"] = df_acf["acf"].round(3)
    df_acf["abs_acf"]=df_acf["abs_acf"].round(3)
    df_acf["threshold"]=round(threshold,3)

    #lag=0 제외 (판정용)
    df_acf_result = df_acf.query("lag>0").reset_index(drop=True)

    #연속 유의 구간 계산
    df_acf_result["prev_significant"] = df_acf_result["significant"].shift(1)

    df_acf_result["cut_candidate"] = (
        (df_acf_result["prev_significant"] == True)&
        (df_acf_result["significant"] == False)
)

    q_candidate = (
        df_acf_result
        .loc[df_acf_result["cut_candidate"], "lag"]
        .min()-1
    )

    width_px = 2000
    height_px = 500
    rows = 2
    cols = 1
    figsize = (width_px/ my_dpi, height_px/ my_dpi)
    fig, ax = plt.subplots(rows, cols, figsize=figsize, dpi= my_dpi)

#ACF Plot그리기->ax파라미터 필수
    plot_acf(diff_df[yname], ax=ax[0])

#MA(q)후보 시각화

    ax[0].axvline(
        x=q_candidate,
        linestyle="--",
        linewidth=1.5,
        alpha=0.8,
        color = 'red'
)
    ax[0].text(
        q_candidate+0.1,
        ax[0].get_ylim()[1] * 0.9,
        f"MA(q) candidate={q_candidate}",
        fontsize=9,
        verticalalignment="top"
)
    ax[0].set_title("ACF Plot", fontsize=12, pad=8)
    ax[0].set_xlabel("Lag", fontsize=8, labelpad=5)
    ax[0].set_ylabel("Autocorrelation", fontsize=8, labelpad=5)
    ax[0].grid(True, alpha=0.3)

    plot_pacf(diff_df[yname], ax=ax[1])

    #MA(q)후보 시각화

    ax[1].axvline(
        x=p_candidate,
        linestyle="--",
        linewidth=1.5,
        alpha=0.8,
        color = 'red'
)
    ax[1].text(
        p_candidate+0.1,
        ax[1].get_ylim()[1] * 0.9,
        f"AR(q) candidate={p_candidate}",
        fontsize=9,
        verticalalignment="top"
)
    ax[1].set_title("PACF Plot", fontsize=12, pad=8)
    ax[1].set_xlabel("Lag", fontsize=8, labelpad=5)
    ax[1].set_ylabel("Partial Autocorrelation", fontsize=8, labelpad=5)
    ax[1].grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    plt.close()

    return p_candidate, q_candidate

In [14]:
def arima(df, yname, p, d, q, s = None):
    p=0 if np.isnan(p) else p
    d = 0 if np.isnan(d) else d
    q = 0 if np.isnan(q) else q
    s = 0 if np.isnan(s) else s

    result = []
    for x in range(0, p+1):
        for y in range(0, d+1):
            for z in range(0, q+1):
                try:
                    if not s:
                        print(f"p={x}, d={y}, q={z}")
                        model = ARIMA(df[yname], order=(x,y,z))

                    else:
                        print(f"p={x}, d={y}, q={z}, s={s}")
                        model = ARIMA(df[yname], order=(x, y, z),seasonal_order=(x, y, z,6))
                    
                    fit =model.fit()

                    if not fit.mle_retvals['converged']:
                        continue

                    results.append({
                        'p':x,
                        'd':y,
                        'q':z,
                        'AIC':fit.aic,
                        'Bic':fit.bic
                    })

                except Exception as e:
                    continue
    df_results = DataFrame(results)
    best_model = df_results.sort_values(['BIC', 'AIC']).iloc[0]
    return df_results, best_model

In [None]:
def arima_report(fit, data, threshol=0.05):
    """
    SARIMAXResults 객체(fit)와 원본 데이터를 받아
    모형 적합도 표(cdf),
    계수 요약 표(rdf),
    모형 요약 문장(result_report),
    모형 판정 문장(model_report),
    계수별 해석 문장(variable_reports)를 반환한다.
    """
#-----------------------------
#   모형 적합도 요약
    cdf = DataFrame({
        "Log Likelihood": [fit.lif],
        "AIC": [fit.aic],
        "BIC":[fit.bic],
        "HQIC": [fit.hqic],
        "관측치 수": [fit.nibs]
    })
    #-------------
    #계수테이블 구성
    params = fit.params
    bse = fit.bse
    zvals = params/bse
    pvals = fit.pvalues
    conf = fit.conf_int()

    rows = []
    for name in params.index:
        p= pvals[name]
        stars = (
            "***" if p<0.001 else
            "**" if p<0.01 else
            "*" if p<0.05 else
            ""
        )

        rows.append({
            "변수" : name,
            "계수": params[name],
            "표준오차":bse[name],
            "z":f"{zvals[name]:.3f}{stars}",
            "p-value":p,
            "CI_lower":conf.loc[name, 0],
            "CI_upper":conf.loc[name,1]
        })

        rdf = DataFrame(rows)

        #-----------------------------
        #모형 요약 문장
        result_report = (
            f"Log Likelihood = {fit.lif:.3f},"
            f"AIC = {fit.aic:.3f}, "
            f"BIC = {fit.bic:.3f}." 
        )
        #-------------------------------
        #모형 판정 문장
        lb = fit.test_serial_correlation(method="ljungbox")
        #statsmodels 버전마다 pvalue 값의 인덱스가 상이함
        lb_pvalue = lb[0][1][-1]

        model_report = (
            f"ARIMA{fit.model.order}*{fit.model.seasonal_order}모형을 적합한 결과,"
            f"AIC {fit.aic:.3f}, BIC {fit.bic:.3f}로 나타났으며"
            
            )
        if lb_pvalue>= threshold:
            model_report +=(
                "잔차들 사이에 특별한 시간적 패턴은 관찰되지 않음을 통계적으로 확인하였다."
                "(잔차의 자기상관은 Ljung-Box 검정에서 통계적으로 유의하다)"
            )
        #-------------------------------------
        #계수별 해설 문장(시계열 특성 설명 포함)
        variable_reports = []

        for_, row in rdf.iterrows():
            name = row['변수']
        
            if name =="const":
                continue
            coef = row['계수']
            pval = row['p-value']
        
        #---변수 유형 해석---
        if name.startswitch("ar.S"):
            meaning = "한 시즌 전 같은 시점의 값이 현재 값에 미치는 영향"
        elif name.startswith("ma.S"):
            meaning = "한 시즌 전 같은  시점에서 발생한 예측 오차가 현재에 남긴 영향"
        elif name.startswith("ar."):
            meaning = "직전 시점의 값이 현재 값에 미치는 영향"
        elif name.startswith("ma."):
            meaning = "직전 시점에서 발생한 예측 오차가 현재에 남긴 영향 "
        elif name=="const":
            meaning="전체 시계열의 기분 수준"
        else:
            meaning ="시계열의 특정 구조적 요소"

            #--- 방향 해석 ---
        if codf>0:
            direction ="값을 높이는 방향"
        elif codf <0:
            direction="값을 낮추는 방향"
        else:
            direction="뚜렷한 방향성은 없음"
        #---통게적 유의성---
        if pval<threshold:
            stat_text = "통계적으로 의미가 있다"
            plain_text = "우연이 아니라 반복되는 패턴일 가능성이 높다."
        
        else:
            stat_text = "통계적으로 뚜렷하지 않다"
            plain_text = "우연에 의해 나타났을 가능성을 배제하기 어렵다"

        variable_reports.append(
            f""
        )