In [13]:
from datetime import datetime, timedelta
import cx_Oracle
# cx_Oracle.init_oracle_client(lib_dir=r"/Users/kaka/Downloads/instantclient_18_1")

import pandas as pd
from skforecast.ForecasterAutoreg import ForecasterAutoreg
from sklearn.linear_model import Ridge


class SkuDemodsPred(object):
    def __init__(self):

        self.now_time_future = datetime.now().strftime("%Y-%m-%d")
        self.end_time_train = (datetime.now()- timedelta(days=6*30)).strftime("%Y-%m-%d")

        self.lags = 24
        self.steps =180
        self.batch_size = 200

    def mysql_info(self):
        """数据上传"""
        username = 'nppbuf'
        password = 'Svwnppbuf321'
        hostname = '10.122.6.59'
        port = 1521
        service_name = 'b2bt'
        dsn = cx_Oracle.makedsn(hostname, port, service_name=service_name)
        # 连接 Oracle 数据库
        connection = cx_Oracle.connect(user=username, password=password, dsn=dsn)
        return connection

    def data_preprocessing(self,data):
        """补零"""
        date_range = pd.date_range(start='2022-01-01', end=self.now_time_future, freq='D')
        # 创建空的DataFrame，准备存储填充后的结果
        filled_df = pd.DataFrame()
        # 针对每个 SKU 进行填充操作
        for sku, group in data.groupby(['零件号', '仓库代码', '需求流']):
            sku_group = group.set_index('日期').reindex(date_range, fill_value=0).reset_index()
            sku_group['零件号'] = sku[0]
            sku_group['仓库代码'] = sku[1]
            sku_group['需求流'] = sku[2]
            filled_df = filled_df.append(sku_group, ignore_index=True)
        return filled_df

    def feature_processing(self,data,end_time):
        """生成列维度时间序列"""
        data = self.data_preprocessing(data)
        data['合并列'] = data['零件号'] + '_' + data['仓库代码'] + '_' + data['需求流']
        # 去除多余的列名
        data = data.drop(columns=['零件号', '仓库代码', '需求流'])
        data = data.set_index(['index', '合并列'])['需求数量'].unstack()
        data.columns.name = None
        data = data.reset_index()
        new_df = data.copy().rename(columns={'index': 'date'})
        new_df['date'] = pd.to_datetime(new_df['date'], format='%Y-%m-%d')
        new_df = new_df.set_index('date')
        new_df = new_df.asfreq('D')
        new_df = new_df.sort_index()
        data_train = new_df.loc[:end_time, :].copy()
        data_test = new_df.loc[end_time:, :].copy()
        return data_train,data_test

    def demods_groby_month(self,data,type):
        """天级别聚合month"""
        data_prs = data.reset_index()
        data_prs["year"] = pd.to_datetime(data_prs['date']).dt.year.astype(int)
        data_prs["month"] = pd.to_datetime(data_prs['date']).dt.month.astype(int)
        data_prs_info = data_prs.groupby(['year', 'month']).sum().reset_index()
        data_prs_info = data_prs_info.set_index(['year', 'month']).stack()
        data_prs_info = data_prs_info.rename_axis(index=['year', 'month', '零件号'])
        data_prs_info.name = type
        data_prs_info = data_prs_info.reset_index()
        return data_prs_info

    def model_pred(self, data,type):
        """批量训练模型"""
        if type =='train':
            end_time = self.end_time_train
        else:
            end_time = self.now_time_future

        data_train, data_test  = self.feature_processing(data,end_time)
        sku_list = data_train.columns
        # 创建空的字典，用于存储每个时间序列的预测模型
        forecasters = {}
        # 循环遍历每批次SKU，分别训练预测模型
        for i in range(0, len(sku_list), self.batch_size):
            batch_skus = sku_list[i:i + self.batch_size]
            for sku in batch_skus:
                forecaster = ForecasterAutoreg(
                    regressor=Ridge(random_state=123),
                    lags=self.lags,
                )
                # 拟合模型
                forecaster.fit(y=data_train[sku])
                forecasters[sku] = forecaster
            print(f"Finished training batch {i // self.batch_size + 1}/{len(sku_list) // self.batch_size + 1}")
        # 进行未来预测
        predictions = pd.DataFrame()
        for sku, forecaster in forecasters.items():
            forecast = forecaster.predict(steps=self.steps)
            predictions[sku] = forecast
        predictions[predictions < 0.1] = 0
        predictions = self.demods_groby_month(predictions,'pred_values')
        return predictions


    def main(self,data):
        conn  = self.mysql_info()
        predictions = self.model_pred(data,'train')
        predictions.to_sql('t_dd_forecast_detail', conn, if_exists='append')

In [14]:
ods = SkuDemodsPred()


connection = ods.mysql_info()


sql_query = """  
     select dh.hostpartid,                                 --零件号  
       dh.hostlocid,                                  --仓库代码
       dh.historybegdate,          --年月日
       dh.dshostid,                                     --需求流
       dh.historyamount,                              --需求数量
       sa.abc_hits_network,                           --ABC需求频次分类
       sa.abc_cost,                                   --ABC价格分类
       sa.abc_demand,                                 --ABC需求数量分类
       substr(to_char(dh.hostpartid), 4, 1)           --零件号第四位
  from (select * from nppbuf.t_dd_demand_detail
        union all select * from nppbuf.t_dd_demand_detail_history) dh
  left join nppcore.ipcs_part_master pm
    on dh.hostpartid = pm.partnumber
  left join nppcore.ipcs_loc_master lm
    on dh.hostlocid = lm.hostlocid
  left join nppcore.ipcscust_stock_amount sa
    on pm.partid = sa.partid
   and lm.locid = sa.locid
   where dh.hostpartid = '06F253039G'  and dh.hostlocid='8012' 
"""
# 执行 SQL 查询，并按批次抽取数据  
chunksize = 100000  # 每次查询返回的行数，你可以根据需要调整这个值  
query_chunks = pd.read_sql(sql_query, connection, chunksize=chunksize)
# 遍历查询结果的每个批次，并将其合并为一个 DataFrame  
data = pd.DataFrame()  
for chunk in query_chunks: 
    print(chunk.shape)
    data = data.append(chunk)
# 关闭数据库连接  
connection.close()
# 输出合并后的 DataFrame  
data



(464, 9)


  data = data.append(chunk)


Unnamed: 0,HOSTPARTID,HOSTLOCID,HISTORYBEGDATE,DSHOSTID,HISTORYAMOUNT,ABC_HITS_NETWORK,ABC_COST,ABC_DEMAND,"SUBSTR(TO_CHAR(DH.HOSTPARTID),4,1)--零件号第四位"
0,06F253039G,8012,20230531,DEALER,1,A,C,ZERO,2
1,06F253039G,8012,20230102,DEALER,1,A,C,ZERO,2
2,06F253039G,8012,20230325,DEALER,1,A,C,ZERO,2
3,06F253039G,8012,20220913,DEALER,1,A,C,ZERO,2
4,06F253039G,8012,20221112,DEALER,1,A,C,ZERO,2
...,...,...,...,...,...,...,...,...,...
459,06F253039G,8012,20220122,DEALER,1,A,C,ZERO,2
460,06F253039G,8012,20220108,DEALER,1,A,C,ZERO,2
461,06F253039G,8012,20220302,DEALER,1,A,C,ZERO,2
462,06F253039G,8012,20220619,DEALER,1,A,C,ZERO,2


In [15]:
data.columns = ['零件号', '仓库代码', '年月', '需求流', '需求数量', 'ABC需求频次分类',
                          'ABC价格分类', 'ABC需求数量分类', '零件号第四位']
data.head()

Unnamed: 0,零件号,仓库代码,年月,需求流,需求数量,ABC需求频次分类,ABC价格分类,ABC需求数量分类,零件号第四位
0,06F253039G,8012,20230531,DEALER,1,A,C,ZERO,2
1,06F253039G,8012,20230102,DEALER,1,A,C,ZERO,2
2,06F253039G,8012,20230325,DEALER,1,A,C,ZERO,2
3,06F253039G,8012,20220913,DEALER,1,A,C,ZERO,2
4,06F253039G,8012,20221112,DEALER,1,A,C,ZERO,2
