In [None]:
from statsmodels.tsa.seasonal import seasonal_decompose
from pandas import read_csv, concat
from dateutil.parser import parse
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
import statsmodels.formula.api as smf
import statsmodels.tsa.api as smt
import statsmodels.api as sm
import scipy.stats as scs
from itertools import product
from tqdm.notebook import tqdm  
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.arima.model import ARIMA 
from sklearn.model_selection import ParameterGrid
import matplotlib.patches as mpatches
from statsmodels.tsa.seasonal import seasonal_decompose

# 配置 Matplotlib 正常显示中文标签和负号
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False    # 用来正常显示负号

# 读取新能源汽车数据
df_nev_sale = pd.read_excel('E:\\qjy\\ecnu\\毕业论文\\新能源汽车数据\\中国-新能源汽车数据-已处理\\中国新能源汽车销量-已处理.xlsx', index_col=0)

# 统计缺失值
missing_values = df_nev_sale['nev_sale'].isnull().sum()
print(f'Number of missing values: {missing_values}')

# 设置日期索引
df_nev_sale.index = pd.date_range('2014-01-01', '2024-01-01', freq='M')
df_nev_sale = df_nev_sale[24:]  # 从第24行开始保留

# 乘法和加法分解
# 加法分解当中的残差有一些遗留模式，而乘法分解的残差没有趋势。所以乘法分解应该在这种特定的序列当中优先选择
# Multiplicative Decomposition
result_mul = seasonal_decompose(df_nev_sale['nev_sale'], model='multiplicative', extrapolate_trend='freq')

# Additive Decomposition
result_add = seasonal_decompose(df_nev_sale['nev_sale'], model='additive', extrapolate_trend='freq')

# 绘图
plt.rcParams.update({'figure.figsize': (10, 10)})
result_mul.plot().suptitle('Multiplicative Decompose', fontsize=6)
result_add.plot().suptitle('Additive Decompose', fontsize=6)
plt.show()

# 划分训练集和测试集
df_nev_sale_train = df_nev_sale.loc['2016-01-01':'2022-12-31', ['nev_sale']]
# 划分测试集
df_nev_sale_test = df_nev_sale.loc['2022-01-31':, ['nev_sale']]

# 平稳性检验：原数据非平稳
# ADF Test (Augmented Dickey-Fuller Test)
from statsmodels.tsa.stattools import adfuller

result = adfuller(df_nev_sale_train['nev_sale'].values, autolag='AIC')
print(f'ADF Statistic: {result[0]}')
print(f'p-value: {result[1]}')
print('Critical Values:')
for key, value in result[4].items():
    print(f'{key}: {value}')

# 数据的1阶差分+季节差分后平稳
# 一阶差分
df_nev_sale_train['nev_sale_diff1'] = df_nev_sale_train['nev_sale'].diff()
df_nev_sale_train.dropna(axis=0, how='any', inplace=True)

# 季节差分（周期为12个月）
df_nev_sale_train['nev_sale_diff1_diff12'] = df_nev_sale_train['nev_sale_diff1'].diff(12)
df_nev_sale_train.dropna(axis=0, how='any', inplace=True)

# 再次进行 ADF Test
result = adfuller(df_nev_sale_train['nev_sale_diff1_diff12'].values, autolag='AIC')
print(f'ADF Statistic: {result[0]}')
print(f'p-value: {result[1]}')
print('Critical Values:')
for key, value in result[4].items():
    print(f'{key}: {value}')

# 白噪声检验
from statsmodels.stats.diagnostic import acorr_ljungbox

# Ljung-Box test for autocorrelation (白噪声检验)
print(u'变换一阶 12 步序列的白噪声检验结果为：\n', acorr_ljungbox(df_nev_sale_train['nev_sale_diff1_diff12'].values, lags=12))

# MAPE (Mean Absolute Percentage Error)
def mean_absolute_percentage_error(y_true, y_pred):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

# 时序图、ACF和PACF图及Dickey-Fuller检验的可视化函数
def tsplot(y, lags=None, figsize=(12, 7), style='bmh'):
    """
    绘制时间序列、ACF和PACF图，并计算Dickey–Fuller检验
    y - 时间序列
    lags - 用于ACF和PACF计算的滞后数
    """
    if not isinstance(y, pd.Series):
        y = pd.Series(y)

    with plt.style.context(style):
        fig = plt.figure(figsize=figsize)
        layout = (2, 2)
        ts_ax = plt.subplot2grid(layout, (0, 0), colspan=2)
        acf_ax = plt.subplot2grid(layout, (1, 0))
        pacf_ax = plt.subplot2grid(layout, (1, 1))

        y.plot(ax=ts_ax)
        ts_ax.set_title('Time Series')
        smt.graphics.plot_acf(y, lags=lags, ax=acf_ax)
        smt.graphics.plot_pacf(y, lags=lags, ax=pacf_ax)
        plt.tight_layout()
        plt.show()

        # 进行 ADF 检验并打印结果
        result = adfuller(y.values, autolag='AIC')
        print(f'ADF Statistic: {result[0]}')
        print(f'p-value: {result[1]}')
        print('Critical Values:')
        for key, value in result[4].items():
            print(f'{key}: {value}')
        # pacf_ax 继续绘图
        pacf_ax = plt.subplot2grid(layout, (1, 1))

        # 绘制时间序列图
        y.plot(ax=ts_ax)
        p_value = adfuller(y.values, autolag='AIC')[1]
        ts_ax.set_title('Time Series Analysis Plots\n Dickey-Fuller: p={0:.5f}'.format(p_value))

        # 绘制 ACF 和 PACF 图
        smt.graphics.plot_acf(y, lags=lags, ax=acf_ax)
        smt.graphics.plot_pacf(y, lags=lags, ax=pacf_ax)
        plt.tight_layout()

# 使用 tsplot 函数可视化
tsplot(df_nev_sale_train['nev_sale_diff1_diff12'], lags=30)

# ARIMA 参数设置
ps = range(0, 2)
d = range(1, 2)
qs = range(0, 2)

# 季节性相关的参数设置
Ps = range(0, 2)
D = range(1, 2)
Qs = range(0, 2)

# 将参数打包，使用 BIC 准则进行参数选择
params_list = list(product(ps, d, qs, Ps, D, Qs))
print(params_list)

# 根据 AIC 寻找最优参数
def find_best_params(data: np.array, params_list):
    result = []
    best_aic = 100000  # 初始化 AIC 值
    best_model = None
    best_param = None
    
    for param in tqdm(params_list):  # tqdm_notebook 已弃用，使用 tqdm.notebook
        # 拟合 SARIMA 模型
        try:
            model = SARIMAX(data, order=(param[0], param[1], param[2]),
                            seasonal_order=(param[3], param[4], param[5], 12)).fit(disp=-1)
            aicc = model.aic  # 获取模型的 AIC 值

            # 寻找最优模型
            if aicc < best_aic:
                best_model = model
                best_aic = aicc
                best_param = param
                param_1 = (param[0], param[1], param[2])
                param_2 = (param[3], param[4], param[5], 12)
                param_str = 'SARIMA{0}x{1}'.format(param_1, param_2)
                print(param_str)

            result.append([param_str, model.aic])

        except Exception as e:
            print(f"Error for parameters {param}: {e}")
            continue

    result_table = pd.DataFrame(result)
    result_table.columns = ['parameters', 'aic']
    result_table = result_table.sort_values(by='aic', ascending=True).reset_index(drop=True)
    return result_table

# 划分训练集
df_train = df_nev_sale.loc['2016-01-31':'2022-12-31', ['nev_sale']]

# 寻找最佳参数组合
result_table = find_best_params(df_train['nev_sale'].values, params_list)
print(result_table)

# 使用 SARIMAX 模型进行拟合
# SARIMAX(0, 1, 1)x(0, 1, 0, 12)
ma = SARIMAX(df_train, order=(0, 1, 1), seasonal_order=(0, 1, 0, 12)).fit(disp=-1)

# 获取残差并绘制诊断图
resid = ma.resid
fig = ma.plot_diagnostics(figsize=(15, 12))
plt.show()

# 模型摘要
ma.summary()

# 白噪声检验
print(acorr_ljungbox(resid, lags=6))

# 判断残差是否为白噪声序列
# 如果 p 值都大于 0.05，则接受原假设，认为残差是一个白噪声序列

# 使用 SARIMAX(1, 1, 0)x(1, 1, 0, 12) 对 2023 年进行预测
# 预测 2023 年 1 月到 2023 年 12 月的销量
forecast = ma.get_forecast(steps=12)

# 获取预测结果和置信区间
forecast_values = forecast.predicted_mean
confidence_intervals = forecast.conf_int()

# 构建结果 DataFrame
forecast_df = pd.DataFrame({
    '真实值': df_nev_sale.loc['2023-01-01':'2023-12-31', 'nev_sale'],  # 从原始数据中提取真实值
    '预测值': forecast_values,
    '预测下限': confidence_intervals.iloc[:, 0],
    '预测上限': confidence_intervals.iloc[:, 1]
})

# 计算 MAPE (Mean Absolute Percentage Error)
mape = mean_absolute_percentage_error(forecast_df['真实值'], forecast_df['预测值'])
print(f'MAPE: {mape}%')

# 预测未来的值
forecast = ma.predict(12, 95)

# 构建包含历史数据和预测数据的 DataFrame
forecast_df_all = pd.DataFrame({
    '真实值': df_nev_sale.loc['2017-01-01':'2023-12-31', 'nev_sale'],  # 从原始数据中提取真实值
    '预测值': forecast
})

# 绘制真实值和预测值
plt.figure(figsize=(10, 6))
plt.plot(forecast_df_all['真实值'], label='真实值', color='blue')
plt.plot(forecast_df_all['预测值'], label='预测值', color='red', linestyle='--')
plt.axvline(x=pd.to_datetime('2023-01-01'), color='grey', linestyle='--', label='预测起始点')
plt.title('SARIMA 模型预测值和真实值')
plt.xlabel('时间')
plt.ylabel('销量')
plt.legend()
plt.show()
