In [2]:
#准备工作
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf,plot_pacf
import itertools
from sklearn.metrics import r2_score as rs
import warnings

warnings.filterwarnings("ignore")#忽略输出警告
plt.rcParams["font.sans-serif"]=["SimHei"]#用来正常显示中文标签
plt.rcParams["axes.unicode_minus"]=False#用来显示负号

%matplotlib inline
#在jupyter中显示figure，该语句只在jupyter上有用

In [5]:
sales_df = pd.read_csv("sql/glass_processing_sales_data_single_company.csv",
                       parse_dates=['销售日期'])

# 按产品和时间排序
sales_df = sales_df.sort_values(['产品名称', '销售日期'])

# 创建时间序列特征
sales_df['year'] = sales_df['销售日期'].dt.year
sales_df['month'] = sales_df['销售日期'].dt.month
sales_df['week'] = sales_df['销售日期'].dt.isocalendar().week

In [15]:
products = sales_df['产品名称'].unique()
targets = ['销售价格', '销售额', '净利率']

# 选取第一个产品和目标
product = products[0]
target = targets[0]

product_data = sales_df[sales_df['产品名称'] == product]
y = product_data.set_index('销售日期')[target]

product_data.head()
print(product_data)

# #折线图
# fig, ax=plt.subplots(figsize=(15,15))
# y.plot(ax=ax,fontsize=15)
# ax.set_title("ILI率",fontsize=25)
# ax.set_xlabel("时间（周）",fontsize=25)
# ax.set_ylabel("ILI率%",fontsize=25)
# ax.legend(loc="best",fontsize=15)
# ax.grid()

           企业名称    销售区域   产品名称  销售数量   原价    销售价格       销售额  净利率       销售日期
10361  铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃    15  450  1125.0   16875.0  0.6 2024-01-01
10850  铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃   106  450  1125.0  119250.0  0.6 2024-01-01
11146  铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃    63  450  1125.0   70875.0  0.6 2024-01-01
3154   铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃    99  450  1125.0  111375.0  0.6 2024-01-02
7803   铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃    57  450  1125.0   64125.0  0.6 2024-01-02
...         ...     ...    ...   ...  ...     ...       ...  ...        ...
6092   铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃   102  450  1125.0  114750.0  0.6 2025-01-01
8108   铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃    75  450  1125.0   84375.0  0.6 2025-01-01
8869   铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃    42  450  1125.0   47250.0  0.6 2025-01-01
9883   铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃    58  450  1125.0   65250.0  0.6 2025-01-01
10675  铜仁玻璃有限公司  贵州省铜仁市  低辐射玻璃    29  450  1125.0   32625.0  0.6 2025-01-01

[1791 rows x 9 columns]


In [None]:
#LB白噪声检验
from statsmodels.stats.diagnostic import acorr_ljungbox 
def test_white_noise(data,alpha):
    [[lb],[p]]=acorr_ljungbox(data,lags=1)
    if p<alpha:
        print('LB白噪声检验结果：在显著性水平%s下，数据经检验为非白噪声序列'%alpha)
    else:
        print('LB白噪声检验结果：在显著性水平%s下，数据经检验为白噪声序列'%alpha) 


In [None]:
# 白噪声检验

In [195]:
#平稳性检验
#自定义函数用于ADF检查平稳性
from statsmodels.tsa.stattools import adfuller as ADF
def test_stationarity(timeseries,alpha):#alpha为检验选取的显著性水平
    adf=ADF(timeseries)
    p=adf[1]#p值
    critical_value=adf[4]["5%"]#在95%置信区间下的临界的ADF检验值
    test_statistic=adf[0]#ADF统计量
    if p<alpha and test_statistic<critical_value:
        print("ADF平稳性检验结果：在显著性水平%s下，数据经检验平稳"%alpha)
        return True
    else:
        print("ADF平稳性检验结果：在显著性水平%s下，数据经检验不平稳"%alpha)
        return False

In [None]:
# 添加时间序列特征和滞后特征
for lag in [1, 2, 3, 4]:
    product_data[f'lag_{lag}'] = product_data[target].shift(lag)

# 添加移动平均特征
product_data['rolling_3m_mean'] = product_data[target].rolling(window=3).mean()

# 添加季节特征
product_data['quarter'] = product_data['销售日期'].dt.quarter

# 删除空值
product_data = product_data.dropna()

In [223]:
#建立模型
model=sm.tsa.SARIMAX(NGE,order=(1, 1, 1), seasonal_order=(1, 1, 1, 52))
SARIMA_m=model.fit()
print(SARIMA_m.summary())

In [226]:
pred=SARIMA_m.get_prediction(start=0,dynamic=False,full_results=True)

forecast=SARIMA_m.get_forecast(steps=100)

#预测整体可视化
fig,ax=plt.subplots(figsize=(20,16))
NGE.plot(ax=ax,label="base data")
forecast.predicted_mean.plot(ax=ax,label="forecast data")
#ax.fill_between(forecast.conf_int().index(),forecast.conf_int().iloc[:,0],\
#               forecast.conf_int().iloc[:,1],color='grey',alpha=0.15,label='confidence interval')
ax.legend(loc="best",fontsize=20)
ax.set_xlabel("时间（周）",fontsize=20)
ax.set_ylabel("ILI率%",fontsize=18)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)


# plt.figure(figsize=(10,8))
# plt.plot(NGE, label='原数据')
# plt.plot(forecast, label='prediction数据')
# plt.legend()
# plt.show

In [227]:
#模型检验
# test_white_noise(SARIMA_m.resid,0.05)#SARIMA_m.resid提取模型残差，并检验是否为白噪声
fig=SARIMA_m.plot_diagnostics(figsize=(15,12))#plot_diagnostics对象允许我们快速生成模型诊断并调查任何异常行为

In [244]:
#模型预测
from sklearn.metrics import mean_squared_error as mse
from sklearn.metrics import mean_absolute_error as mae
#获取预测结果，自定义预测误差
def PredictionAnalysis(data,model,start,dynamic=False):
    pred=model.get_prediction(start=start,dynamic=dynamic,full_results=True)
    pci=pred.conf_int()#置信区间
    pm=pred.predicted_mean#预测值
    truth=data[start:]#真实值
    pc=pd.concat([truth,pm,pci],axis=1)#按列拼接
    pc.columns=['true','pred','up','low']#定义列索引
    print(type(truth))
    print(type(pm))
    print("1、MSE:{}".format(mse(truth,pm)))
    print("2、RMSE:{}".format(np.sqrt(mse(truth,pm))))
    print("3、MAE:{}".format(mae(truth,pm)))
    return pc
#绘制预测结果
def PredictonPlot(pc):
    plt.figure(figsize=(10,8))
    plt.fill_between(pc.index,pc['up'],pc['low'],color='grey',
                     alpha=0.15,label='confidence interval')#画出置信区间
    plt.plot(pc['true'],label='base data')
    plt.plot(pc['pred'],label='prediction curve')
    plt.legend()
    plt.show
    return True

In [229]:
pred=SARIMA_m.get_prediction(start=0,dynamic=False,full_results=True)
pm=pred.predicted_mean
print(pm)

In [232]:
pred = SARIMA_m.get_prediction(start=200, dynamic=False) #预测值
# forecast=SARIMA_m.get_forecast(steps=50)

pred_ci = pred.conf_int() #置信区间
 
#画出预测值和真实值的plot图

ax = NGE[0:].plot(label='真实值')
pred.predicted_mean.plot(ax=ax, label='预测值', alpha=.7)

ax.fill_between(pred_ci.index,
                pred_ci.iloc[:, 0],
                pred_ci.iloc[:, 1], color='k', alpha=.2)

ax.set_xlabel('周')
ax.set_ylabel('ILI率')
plt.legend()

plt.show()

In [245]:
#静态预测：进行一系列的一步预测，即它必须用真实值来进行预测
pred=PredictionAnalysis(NGE,SARIMA_m,100)
PredictonPlot(pred)

In [243]:
#动态预测：进行多步预测，除了第一个预测值是用实际值预测外，其后各预测值都是采用递推预测
pred=PredictionAnalysis(NGE,SARIMA_m,200,dynamic=True)
PredictonPlot(pred)

In [173]:
#预测未来
forecast=SARIMA_m.get_forecast(steps=50)
#预测整体可视化
fig,ax=plt.subplots(figsize=(20,16))
NGE.plot(ax=ax,label="base data")
forecast.predicted_mean.plot(ax=ax,label="forecast data")
#ax.fill_between(forecast.conf_int().index(),forecast.conf_int().iloc[:,0],\
#               forecast.conf_int().iloc[:,1],color='grey',alpha=0.15,label='confidence interval')
ax.legend(loc="best",fontsize=20)
ax.set_xlabel("时间（周）",fontsize=20)
ax.set_ylabel("ILI率%",fontsize=18)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.show

In [58]:
import xgboost as xgb

In [238]:
# 建立XGBoost模型
xgb_model = xgb.XGBRegressor(n_estimators=10, max_depth=3, learning_rate=0.1)
# 假设你已经有了预测因子和目标变量的数据，这里假设预测因子为X，目标变量为y
xgb_model.fit(NGE, temperature)

In [111]:
from sklearn.datasets import load_iris
from sklearn.metrics import mean_squared_error, accuracy_score
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(temperature, NGE, test_size=0.3, random_state=23)
dtrain = xgb.DMatrix(temperature, label=NGE)
dtest = xgb.DMatrix(temperature, label=NGE)
# 设置参数
params = {
    'eta': 0.3,#学习率
    # 'reg_alpha': 0.01,
    # 'reg_lambda': 0.01,
    'max_depth': 100
}

# 训练模型
bst = xgb.train(
    params=params,
    dtrain=dtrain,
    num_boost_round=20
)

# 预测结果
ypred = bst.predict(dtest)
print('MSE of prediction on boston dataset:', mean_squared_error(NGE, ypred))
print('\n')
# print(NGE)
# print(ypred)
plt.figure(figsize=(10,8))
plt.plot(NGE, label='原数据')
plt.plot(ypred, label='prediction数据')
plt.legend()
plt.show



# 
# fea_imp = xgbc.feature_importances_
# print(fea_imp)

# 
# ## 使用原生xgboost解决分类问题
# # 读取数据
# iris_data = load_iris()
# X = iris_data.data
# y = iris_data.target
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=23)
# 
# dtrain = xgb.DMatrix(X_train, label=y_train)
# dtest = xgb.DMatrix(X_test, label=y_test)
# 
# # 设置参数
# params = {
#     'objective': 'multi:softmax',
#     'num_class': 3,
#     'eta': 0.1,
#     'reg_alpha': 0.01,
#     'reg_lambda': 0.01,
#     'max_depth': 8
# }
# 
# # 训练模型
# bst = xgb.train(
#     params=params,
#     dtrain=dtrain,
#     num_boost_round=20,
#     evals=[(dtrain, 'train'), (dtest, 'test')] # 将训练数据和测试数据都作为验证集，可以实时监督训练情况，是否过拟合
# )

# # 预测结果
# result = bst.predict(
#     dtest,
#     ntree_limit=10
# )
# print('Accuracy of prediction on iris dataset:', accuracy_score(y_test, result))

In [241]:
def update_weights(sarima_pred, xgb_pred, P, Q, R):
    # 计算卡尔曼增益
    K = P / (P + R)
    # 更新权重
    updated_weight = sarima_pred + K * (xgb_pred - sarima_pred)
    # 更新P
    P = P - K * P + Q
    return updated_weight, P


# 初始化P，Q，R
P = 1
Q = 0.1
R = 0.1
# 假设有两个模型的预测结果sarima_pred和xgb_pred
sarima_pred = pm
xgb_pred = ypred
# 更新权重


updated_weight, P = update_weights(sarima_pred, xgb_pred, P, Q, R)
print("Updated Weight:", updated_weight)
final_prediction = updated_weight * sarima_pred + (1 - updated_weight) * xgb_pred
print("Final Prediction:", final_prediction)


plt.figure(figsize=(10,8))
plt.plot(NGE, label='原数据')
plt.plot(updated_weight, label='prediction数据')
plt.legend()
plt.show



In [240]:
from sklearn.metrics import mean_squared_error as mse
from sklearn.metrics import mean_absolute_error as mae
#获取预测结果，自定义预测误差
def PredictionAnalysis(data,model,start,dynamic=False):
    pred=model.get_prediction(start=start,dynamic=dynamic,full_results=True)
    pci=pred.conf_int()#置信区间
    pm=pred.predicted_mean#预测值
    truth=data[start:]#真实值
    pc=pd.concat([truth,pm,pci],axis=1)#按列拼接
    pc.columns=['true','pred','up','low']#定义列索引
    print("1、MSE:{}".format(mse(truth,pm)))
    print("2、RMSE:{}".format(np.sqrt(mse(truth,pm))))
    print("3、MAE:{}".format(mae(truth,pm)))
    return pc
#绘制预测结果
def PredictonPlot(pc):
    plt.figure(figsize=(10,8))
    plt.fill_between(pc.index,pc['up'],pc['low'],color='grey',
                     alpha=0.15,label='confidence interval')#画出置信区间
    plt.plot(pc['true'],label='base data')
    plt.plot(pc['pred'],label='prediction curve')
    plt.legend()
    plt.show
    return True