In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.stats.diagnostic import acorr_ljungbox
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error

In [None]:
def get_file_path(folder_name):
    paths = []
    folder_name = os.path.join(os.curdir, folder_name)
    for file_name in os.listdir(folder_name):
        path = os.path.join(folder_name, file_name)
        if path.lower().endswith(".xlsx"):
            paths.append(path)
        else: paths.append(get_file_path(path))
    return paths

In [108]:
path = './国内各省汽油定价'
spath = './5city'
for f in os.listdir(path):
    filepath = os.path.join(path, f)
    df = pd.read_csv(filepath)
    df['Date'] = pd.to_datetime(df['Date'])
    # 选择时间范围
    start_date = '2016-01-01'
    end_date = '2022-12-31'

    savepath = os.path.join(spath, f)
    # 通过布尔索引选择符合条件的数据
    df = df[(df['Date'] >= start_date) & (df['Date'] <= end_date)]

    # 按年份和月份分组
    grouped = df.groupby([df['Date'].dt.year, df['Date'].dt.month])

    # 选择每月的最后一天
    result = grouped.last()

    # 将日期格式修改为"y-m"
    result['Date'] = result['Date'].dt.strftime('%Y-%m')

    result.to_csv(savepath, index = False)

In [None]:
fp1 = get_file_path('2016-2022各省份公共充电桩保有量')
fp2 = get_file_path('汽油历史调价')

In [None]:
charger_data_sum = [0,0,0,0,0,0,0]
for i in range(len(fp1)):
    charger_data = pd.read_excel(fp1[i])
    charger_data_sum[i] = charger_data['公共充电桩保有量（台）'].sum()

In [None]:
gas_price_data = pd.read_excel(fp2[1])
gas_price_data.head(10)

In [None]:
gas_price_data['调整日期'] = pd.to_datetime(gas_price_data['调整日期'])
gas_price_data.set_index('调整日期', inplace=True)
gas_price_data.head(10)

In [None]:
# 可视化历史油价走势
plt.figure(figsize=(12, 6))
plt.plot(gas_price_data.index, gas_price_data['价格(元/吨)'], label='Historical Oil Prices')
plt.title('Historical Oil Prices Over Time')
plt.xlabel('Year')
plt.ylabel('Oil Price')
plt.legend()
plt.show()

In [None]:
# 对同一月份内的数据进行平均值聚合
gas_price_data = pd.read_excel(fp2[1])
gas_price_data['调整日期'] = pd.to_datetime(gas_price_data['调整日期'])
gas_price_data['调整日期'] = gas_price_data['调整日期'].dt.to_period('M')  # 截取年-月部分
gas_price_data.set_index('调整日期', inplace=True)
gas_price_data.head(10)

In [None]:
gas_price_data = gas_price_data.resample('M').mean(numeric_only=True)
gas_price_data.fillna(0, inplace=True)

In [None]:
# 噪声检测
def noise_detection_test(data):
    # 拟合 ARIMA 模型
    model = ARIMA(data, order=(1, 1, 1))  # 根据需要调整 ARIMA 模型的参数
    model_fit = model.fit()

    # 计算残差
    residuals = model_fit.resid

    # 可视化残差
    plt.plot(residuals)
    plt.title('Residuals of ARIMA Model')
    plt.xlabel('Time')
    plt.ylabel('Residuals')
    plt.show()

    # 进行 Ljung-Box 检验
    df = acorr_ljungbox(residuals, lags=[20])
    lb_stat, lb_p_value = df['lb_stat'], df['lb_pvalue']
    print(f"Ljung-Box test stat: {type(lb_stat)}, p-value: {type(lb_p_value)}")

noise_detection_test(gas_price_data['价格(元/吨)'])

In [None]:
# 寻找 ARIMA 模型参数
def find_best_arima_model(data):
    tscv = TimeSeriesSplit(n_splits=5)
    best_mse = np.inf
    best_order = None

    for train_index, test_index in tscv.split(data):
        train, test = data[train_index], data[test_index]
        order_candidate = [(p, d, q) for p in range(3) for d in range(2) for q in range(3)]

        for order in order_candidate:
            try:
                model = ARIMA(train, order=order)
                model_fit = model.fit()
                predictions = model_fit.forecast(steps=len(test))
                mse = mean_squared_error(test, predictions)

                if mse < best_mse:
                    best_mse = mse
                    best_order = order
            except:
                continue

    print(f"Best ARIMA Order: {best_order}, Best MSE: {best_mse}")

In [None]:
gas_price_data

In [None]:

# 进行 ARIMA 模型选择
find_best_arima_model(gas_price_data['价格(元/吨)'])

In [None]:
# 拆分训练集和测试集
train_size = int(len(gas_price_data) * 0.8)
train, test = gas_price_data[:train_size], gas_price_data[train_size:]

In [None]:
# 训练 ARIMA 模型
order = (5, 1, 0)  # 选择合适的 ARIMA 模型参数，这里只是一个示例
model = ARIMA(train['价格(元/吨)'], order=order)
model_fit = model.fit()

# 模型预测
predictions = model_fit.forecast(steps=len(test))

# 6. 评估模型
mse = mean_squared_error(test['价格(元/吨)'], predictions)
print(f'Mean Squared Error: {mse}')