## Note

- 입고 유형 별로 모델 필요
    - 발주입고 (거래처 to 센터)
    - 이동입고 (서브센터 또는 통합물류센터 to 각 센터)
    - 강북서브센터/강남서브센터/김포통합물류센터
    - 통합물류센터는 2020/6/1에 오픈 -> 초기모델에서는 제외
- 단위: 센터별/일별/유형별
- 예측기간: 근미래 7일
- 데이터 제공주기
    - 일 1회(오전 시간대)
- 전달 포멧
    - 임시 수동작업: https://docs.google.com/spreadsheets/d/1zxNTpYxFzus8v4lHAp2LNO3B-LzJw9-b-k35TShTAok/edit#gid=0
    - 추후 테이블 자동화 작업 필요 -> 데이터플랫폼팀에 요청
- type='rcvng_ord' 발주입고
- type='rcvng_invtrford' 이동입고 
- 센터 in조건: B마트 강남, B마트 노원, B마트 영등포, B마트 용산, B마트 마포, B마트 광진, B마트 송파, B마트 강북    

In [1]:
import os
import findspark
from pyspark.sql import SparkSession, SQLContext
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from lightgbm import LGBMRegressor

lgb_reg = LGBMRegressor(objective='regression')
os.environ['SPARK_HOME'] = '/usr/hdp/current/spark2-client/'
findspark.init()
sc = SparkSession.builder \
         .master("yarn") \
         .appName("bmart_warehouse_predict_test") \
         .enableHiveSupport() \
         .getOrCreate()
spark = SQLContext(sc)

In [2]:
class BmartWarehousePredModel:
    
    def __init__(self, reg_model, time_period):
        self._reg_model = reg_model
        self._train_start_dt = (pd.to_datetime('now') - timedelta(days=time_period)).strftime("%Y-%m-%d")
        self._std_dt = pd.to_datetime('now').strftime("%Y-%m-%d")
        self._pred_end_dt = (pd.to_datetime('now') + timedelta(days=6)).strftime("%Y-%m-%d")

    # 학습용 데이터 추출
    def fetch_train_df(self, spark):
        sql = """
            select
                part_date,            
                center_id,
                name,
                dtype,
                cast(sum(qty_io) as int) qty_sum
            from market.inventory_history A
                inner join market.bm_shop B 
                    on A.center_id = B.shop_id
                    and B.name not like '%우형%'
                    and B.address NOT LIKE '%우아한형제도%'
                    and B.status = 'ACTIVE'                                
            where part_date >= '{start_dt}'
                and part_date < '{end_dt}'
                and dtype in ('rcvng_ord', 'rcvng_invtrford')
                and center_id in ('2', '21', '10', '3', '9', '20', '1', '22')
            group by 
                part_date,
                center_id,
                name,
                dtype             
            order by 
                part_date,
                center_id,
                name,
                dtype
                """.format(start_dt=self._train_start_dt, end_dt=self._std_dt)  
        df = spark.sql(sql).toPandas()
        return df
        
    # 휴일 table 추출
    def fetch_holiday_df(self, spark):
        sql = """
              select
                  date_cd as part_date,
                  if(holiday_yn = 'Y', 1, 0) as holiday_yn
              from sbbi.dim_date_cd
              where date_cd >= '{start_dt}'
                  and date_cd < '{end_dt}'
              """.format(start_dt=self._train_start_dt, end_dt=self._pred_end_dt)
        holiday = spark.sql(sql).toPandas()
        return holiday
    
    # 전처리
    def _preprocessing(self, df, holiday, dtype):
        # 날짜 포멧
        df['part_date'] = pd.to_datetime(df['part_date'])
        holiday['part_date'] = pd.to_datetime(holiday['part_date'])    
        df['dayofweek'] = df['part_date'].dt.dayofweek
        df = df.merge(holiday, on='part_date', how='left')
        df_dtype = df.query("dtype == @dtype").drop(['dtype','name'], axis=1)
        return df_dtype
    
    # 예측기간 더미 데이터프레임
    def _create_pred_period(self, center_id, holiday):
        pred_date = pd.date_range(start=self._std_dt, end=self._pred_end_dt)
        pred_df = pd.DataFrame(pred_date)
        pred_df.columns = ['part_date']
        pred_df['center_id'] = center_id
        pred_df['qty_sum'] = np.nan
        pred_df['dayofweek'] = pred_df['part_date'].dt.dayofweek
        holiday['part_date'] = pd.to_datetime(holiday['part_date'])
        pred_df = pred_df.merge(holiday, on='part_date', how='left')
        return pred_df
    
    # 지점별 학습 및 예측값 산출    
    def fit_predict(self, df, holiday, dtype):     
        df_type = self._preprocessing(df, holiday, dtype)
        
        # 센터별 반복
        pred_res = pd.DataFrame()
        center_list = df_type['center_id'].drop_duplicates().values
        for center_id in center_list:
            # 센터별 데이터 병합
            df2 = df_type.query("center_id == @center_id")
            pred_period = self._create_pred_period(center_id, holiday)
            df2 = pd.concat([df2, pred_period])

            # 이전 동시간 데이터 shift
            df2['qty_sum_prev_week'] = df2.groupby(["center_id","dayofweek"])['qty_sum'].shift(1)
            df2['qty_sum_prev_day'] = df2.groupby("center_id")['qty_sum'].shift(1)
            df2['qty_sum_prev_holiday'] = df2.groupby("center_id")['holiday_yn'].shift(1)
            df2['qty_sum_next_holiday'] = df2.groupby("center_id")['holiday_yn'].shift(-1)

            # 데이터셋 분리
            train = df2.query("part_date < @self._std_dt")
            test = df2.query("part_date >= @self._std_dt")    
            train_X = train.drop(['center_id', 'part_date', 'qty_sum'], axis=1)
            train_y = train['qty_sum']
            test_X = test.drop(['center_id', 'part_date', 'qty_sum'], axis=1)

            # 모델링
            lgb_reg = LGBMRegressor(objective='regression')
            lgb_reg.fit(train_X, train_y)
            pred_df = test[['part_date', 'center_id']].copy()
            pred_df['qty_sum'] = lgb_reg.predict(test_X).round(0).astype(int)
            pred_res = pred_res.append(pred_df)
        
        return pred_res

In [3]:
time_period = 224

pred_test = BmartWarehousePredModel(lgb_reg, time_period)

In [4]:
df = pred_test.fetch_train_df(spark)

In [5]:
holiday = pred_test.fetch_holiday_df(spark)

In [6]:
# 발주량 예측
pred_res_order = pred_test.fit_predict(df, holiday, 'rcvng_ord')

In [7]:
pred_res_order.head()

Unnamed: 0,part_date,center_id,qty_sum
0,2020-06-19,1,9855
1,2020-06-20,1,4700
2,2020-06-21,1,836
3,2020-06-22,1,3509
4,2020-06-23,1,2039


In [None]:
pred_res_order.to_excel("pred_res_ord_20200619.xlsx")

In [8]:
# 이동량 예측
pred_res_tns = pred_test.fit_predict(df, holiday, 'rcvng_invtrford')

In [11]:
pred_res_tns.head()

Unnamed: 0,part_date,center_id,qty_sum
0,2020-06-19,1,4544
1,2020-06-20,1,2309
2,2020-06-21,1,2632
3,2020-06-22,1,3135
4,2020-06-23,1,3237


In [None]:
pred_res_tns.to_excel("pred_res_tns_20200619.xlsx")