source: https://www.kaggle.com/kyakovlev/m5-lags-features from https://www.kaggle.com/ejunichi/m5-three-shades-of-dark-darker-magic

In [1]:
import sys
import os
import pathlib
import gc
import pandas as pd
pd.set_option('display.max_columns', 500)
# pd.set_option('display.max_rows', 500)
import numpy as np
import math
import random
import pickle
import time
from datetime import date
from datetime import timedelta
from datetime import datetime
import psutil
import warnings

# custom import
from sklearn.preprocessing import LabelEncoder
from multiprocessing import Pool        # Multiprocess Runs

# warnings.filterwarnings('ignore')

# constant variables for helper functions

In [2]:
N_CORES = psutil.cpu_count()     # Available CPU cores
print(f"N_CORES: {N_CORES}")

N_CORES: 16


# function nicely diplaying a head of Pandas DataFrame

In [3]:
import IPython

def display(*dfs, head=True):
    for df in dfs:
        IPython.display.display(df.head() if head else df)

# function fixing random seeds

In [4]:
def seed_everything(seed=0):
    """Sets seed to make all processes deterministic     # type: int
    
    """
    random.seed(seed)
    np.random.seed(seed)

SEED = 42
seed_everything(SEED)    

# function processing df in multiprocess

In [5]:
def run_df_in_multiprocess(func, t_split):
    """Process ds in Multiprocess
    
    """
    num_cores = np.min([N_CORES,len(t_split)])
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, t_split), axis=1)
    pool.close()
    pool.join()
    return df

# other helper functions

In [6]:
def get_memory_usage():
    """メモリ使用量を確認するためのシンプルな「メモリプロファイラ」
    
    """
    return np.round(psutil.Process(os.getpid()).memory_info()[0]/2.**30, 2) 
        
def sizeof_fmt(num, suffix='B'):
    for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
        if abs(num) < 1024.0:
            return "%3.1f%s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.1f%s%s" % (num, 'Yi', suffix)


def merge_by_concat(df1, df2, merge_on):
    """
    dtypesを失わないための連結による結合
    
    """
    
    merged_gf = df1[merge_on]
    merged_gf = merged_gf.merge(df2, on=merge_on, how='left')
    new_columns = [col for col in list(merged_gf) if col not in merge_on]
    df1 = pd.concat([df1, merged_gf[new_columns]], axis=1)
    return df1

#  constant variables for data import

In [7]:
_DATA_DIR = os.path.sep.join(["data", "M5_Three_shades_of_Dark_Darker_magic"])

BASE = "clearned_base_grid_for_darker_magic.pkl"
PRICE = "base_grid_with_sales_price_features_for_darker_magic.pkl"
CALENDAR = "base_grid_with_calendar_features_for_darker_magic.pkl"


# function importing data

In [8]:
def reduce_mem_usage(df, verbose=True):
    """
    reduce the memory usage of the given dataframe.
    https://qiita.com/hiroyuki_kageyama/items/02865616811022f79754
    
    Args:
        df: Dataframe
        verbose: 
        
    Returns:
        df, whose memory usage is reduced.

    Raises:
        None
    """
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2    
    for col in df.columns: #columns毎に処理
        col_type = df[col].dtypes
        if col_type in numerics: #numericsのデータ型の範囲内のときに処理を実行. データの最大最小値を元にデータ型を効率的なものに変更
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)    
    end_mem = df.memory_usage().sum() / 1024**2
    if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
    return df

def read_data(directory, file_name):
    print('Reading files...')
    df = pd.read_csv(os.path.sep.join([str(directory), _DATA_DIR, file_name]))
    df = reduce_mem_usage(df)
    print('{} has {} rows and {} columns'.format(file_name, df.shape[0], df.shape[1]))
    
    return df

# read pickle

In [9]:
parent_dir = pathlib.Path(os.path.abspath(os.curdir)).parent.parent
base_df = pd.concat([pd.read_pickle(os.path.sep.join([str(parent_dir), _DATA_DIR, BASE])),
                pd.read_pickle(os.path.sep.join([str(parent_dir), _DATA_DIR, PRICE])).iloc[:,2:],
                pd.read_pickle(os.path.sep.join([str(parent_dir), _DATA_DIR, CALENDAR])).iloc[:,2:]],
                axis=1)

print(base_df)

                                     id        item_id    dept_id   cat_id  \
0         HOBBIES_1_008_CA_1_validation  HOBBIES_1_008  HOBBIES_1  HOBBIES   
1         HOBBIES_1_009_CA_1_validation  HOBBIES_1_009  HOBBIES_1  HOBBIES   
2         HOBBIES_1_010_CA_1_validation  HOBBIES_1_010  HOBBIES_1  HOBBIES   
3         HOBBIES_1_012_CA_1_validation  HOBBIES_1_012  HOBBIES_1  HOBBIES   
4         HOBBIES_1_015_CA_1_validation  HOBBIES_1_015  HOBBIES_1  HOBBIES   
...                                 ...            ...        ...      ...   
46881672    FOODS_3_823_WI_3_validation    FOODS_3_823    FOODS_3    FOODS   
46881673    FOODS_3_824_WI_3_validation    FOODS_3_824    FOODS_3    FOODS   
46881674    FOODS_3_825_WI_3_validation    FOODS_3_825    FOODS_3    FOODS   
46881675    FOODS_3_826_WI_3_validation    FOODS_3_826    FOODS_3    FOODS   
46881676    FOODS_3_827_WI_3_validation    FOODS_3_827    FOODS_3    FOODS   

         store_id state_id     d  sales  release  sell_price  p

# constant variables for preprocessing

In [10]:
# The historical data range from 2011-01-29 to 2016-06-19
_FIRST_SALES_DATE = date(year=2011, month=1, day=29)

# 予測期間とitem数の定義 / number of items, and number of prediction period
_NUM_UNIQUE_ITEM_ID = 30490
_DAYS_FOR_PREDICTION = 28

DAYS_PER_YEAR = 365
_NUM_YEARS_FOR_MELT = 2
_NUM_IMPORT_ROWS_FOR_MELT = DAYS_PER_YEAR * _NUM_YEARS_FOR_MELT * _NUM_UNIQUE_ITEM_ID
print(f"_NUM_IMPORT_ROWS_FOR_MELT: {_NUM_IMPORT_ROWS_FOR_MELT}")

_SALES_HISTORY_DAYS = 1913 # And we will use last 28 days as validation
_SALES_HISTORY_START_DAYS_FOR_VALIDATION = _SALES_HISTORY_DAYS + 1
_SALES_HISTORY_START_DAYS_FOR_EVALUATION = 1942

TARGET = 'sales' # Our Target
MAIN_INDEX = ['id','d']  # We can identify items by these columns
USE_PREPROCESSED_DATAFRAME = True

_NUM_IMPORT_ROWS_FOR_MELT: 22257700


In [11]:
parent_dir = pathlib.Path(os.path.abspath(os.curdir)).parent.parent
print(f"parent_dir: {parent_dir}")
_EXPORT_FILE_NAME = "aws_forecast_base_renamed_preprocessed_df.pkl"

# convert "d" column to date. the format is like "2019-01-01". soruce: https://docs.aws.amazon.com/forecast/latest/dg/forecast.dg.pdf
current_date = _FIRST_SALES_DATE
max_num_days = base_df['d'].max()
print(f"max_num_days: {max_num_days}")

# change the data format of "d" from int to string      
base_df['d'] = base_df['d'].astype(str)
print(f"base_df.dtypes: {base_df.dtypes}")

if not USE_PREPROCESSED_DATAFRAME:      
    for i in range (1, max_num_days+1):
    #     base_df[base_df['d'] == i]['d'] = current_date.strftime("%Y-%m-%d")
        base_df.loc[base_df['d'] == str(i), 'd'] = current_date.strftime('%Y-%m-%d')
        if i % 10 == 0:
            print(f"i: {i}, datetime.now(): {datetime.now()}")
            print(f"current_date: {current_date}")
#             print(f"base_df.loc[base_df['d'] == current_date.strftime('%Y-%m-%d')]: {base_df.loc[base_df['d'] == current_date.strftime('%Y-%m-%d')]}")

        current_date = current_date + timedelta(days=1)

    # change the column name
    base_renamed_df = base_df.rename(columns={'d': 'timestamp'})
    print(f"base_renamed_df: {base_renamed_df}") 
                  
    # export the converted dataframe as pickle file
    print("data export start")
    base_renamed_df.to_pickle(os.path.sep.join([str(parent_dir), _DATA_DIR, _EXPORT_FILE_NAME]))
    print('data export finished. Size:', base_renamed_df.shape)
                  
else:
    #  read pickle file
    pkl_path = os.path.sep.join([str(parent_dir), _DATA_DIR, _EXPORT_FILE_NAME])
    base_renamed_df = pd.read_pickle(pkl_path)


parent_dir: /home/ec2-user/SageMaker
max_num_days: 1941
base_df.dtypes: id                  category
item_id             category
dept_id             category
cat_id              category
store_id            category
state_id            category
d                     object
sales                float64
release                int16
sell_price           float16
price_max            float16
price_min            float16
price_std            float16
price_mean           float16
price_norm           float16
price_nunique        float16
item_nunique           int16
price_momentum       float16
price_momentum_m     float16
price_momentum_y     float16
event_name_1        category
event_type_1        category
event_name_2        category
event_type_2        category
snap_CA             category
snap_TX             category
snap_WI             category
tm_d                    int8
tm_w                    int8
tm_m                    int8
tm_y                    int8
tm_wm                   int8


# extract the columns

In [12]:
#     id
#     item_id
#     dept_id
#     cat_id
#     store_id
#     state_id
#     d
#     sales
#     sell_price
# tm_dw: day of week <- delete due to the number of feature limitation for aws forecast
# tm_w_end: weekend
extracted_base_df = base_renamed_df.loc[:,['id','item_id','dept_id','cat_id','store_id','state_id','timestamp','sales','sell_price','tm_w_end']]
print(extracted_base_df)

extracted_base_without_id_df = extracted_base_df.drop('id', axis=1)

                                     id        item_id    dept_id   cat_id  \
0         HOBBIES_1_008_CA_1_validation  HOBBIES_1_008  HOBBIES_1  HOBBIES   
1         HOBBIES_1_009_CA_1_validation  HOBBIES_1_009  HOBBIES_1  HOBBIES   
2         HOBBIES_1_010_CA_1_validation  HOBBIES_1_010  HOBBIES_1  HOBBIES   
3         HOBBIES_1_012_CA_1_validation  HOBBIES_1_012  HOBBIES_1  HOBBIES   
4         HOBBIES_1_015_CA_1_validation  HOBBIES_1_015  HOBBIES_1  HOBBIES   
...                                 ...            ...        ...      ...   
46881672    FOODS_3_823_WI_3_validation    FOODS_3_823    FOODS_3    FOODS   
46881673    FOODS_3_824_WI_3_validation    FOODS_3_824    FOODS_3    FOODS   
46881674    FOODS_3_825_WI_3_validation    FOODS_3_825    FOODS_3    FOODS   
46881675    FOODS_3_826_WI_3_validation    FOODS_3_826    FOODS_3    FOODS   
46881676    FOODS_3_827_WI_3_validation    FOODS_3_827    FOODS_3    FOODS   

         store_id state_id   timestamp  sales  sell_price  tm_w

# reformat the columns for aws forecast 

In [13]:
# change the data type
extracted_base_df["sell_price"] = extracted_base_df["sell_price"].astype(str)
extracted_base_df["tm_w_end"] = extracted_base_df["tm_w_end"].astype(str)

# Target Time Series Dataset Typeには、developers guideに乗っていないFeatureも入れられるがStringしかだめ。
# sell_priceはRelated Time Series Dataset Typeとして別のdatasetに入れないとだめ。
# change the column name
extracted_base_df = extracted_base_df.rename(columns={'item_id': 'item_category'})
extracted_base_df = extracted_base_df.rename(columns={'id': 'item_id'})

In [14]:
# reformat the custom domain df
base_renamed_custom_domain_df = extracted_base_df.rename(columns={'sales': 'target_value'})

# reformat the retail domain df
base_renamed_retail_domain_df = extracted_base_df.rename(columns={'sales': 'demand'})
base_renamed_retail_domain_df = base_renamed_retail_domain_df.rename(columns={'sell_price': 'price'})
base_renamed_retail_domain_df["price"] = base_renamed_retail_domain_df["price"].astype('float32')

# create retail domain df related dataset
base_renamed_retail_domain_related_dataset_df = base_renamed_retail_domain_df[['item_id', 'timestamp', 'price']]
base_renamed_retail_domain_df = base_renamed_retail_domain_df.drop('price', axis=1)

print(f"base_renamed_custom_domain_df: {base_renamed_custom_domain_df}")
print(f"base_renamed_retail_domain_df: {base_renamed_retail_domain_df}")
print(f"base_renamed_retail_domain_related_dataset_df: {base_renamed_retail_domain_related_dataset_df}")

base_renamed_custom_domain_df:                                 item_id  item_category    dept_id   cat_id  \
0         HOBBIES_1_008_CA_1_validation  HOBBIES_1_008  HOBBIES_1  HOBBIES   
1         HOBBIES_1_009_CA_1_validation  HOBBIES_1_009  HOBBIES_1  HOBBIES   
2         HOBBIES_1_010_CA_1_validation  HOBBIES_1_010  HOBBIES_1  HOBBIES   
3         HOBBIES_1_012_CA_1_validation  HOBBIES_1_012  HOBBIES_1  HOBBIES   
4         HOBBIES_1_015_CA_1_validation  HOBBIES_1_015  HOBBIES_1  HOBBIES   
...                                 ...            ...        ...      ...   
46881672    FOODS_3_823_WI_3_validation    FOODS_3_823    FOODS_3    FOODS   
46881673    FOODS_3_824_WI_3_validation    FOODS_3_824    FOODS_3    FOODS   
46881674    FOODS_3_825_WI_3_validation    FOODS_3_825    FOODS_3    FOODS   
46881675    FOODS_3_826_WI_3_validation    FOODS_3_826    FOODS_3    FOODS   
46881676    FOODS_3_827_WI_3_validation    FOODS_3_827    FOODS_3    FOODS   

         store_id state_id   tim

In [15]:

base_renamed_retail_domain_df['item_id'].unique()


[HOBBIES_1_008_CA_1_validation, HOBBIES_1_009_CA_1_validation, HOBBIES_1_010_CA_1_validation, HOBBIES_1_012_CA_1_validation, HOBBIES_1_015_CA_1_validation, ..., HOUSEHOLD_1_278_CA_3_validation, FOODS_3_595_CA_3_validation, HOUSEHOLD_1_400_CA_4_validation, HOUSEHOLD_1_386_WI_1_validation, HOUSEHOLD_1_020_WI_2_validation]
Length: 30490
Categories (30490, object): [HOBBIES_1_008_CA_1_validation, HOBBIES_1_009_CA_1_validation, HOBBIES_1_010_CA_1_validation, HOBBIES_1_012_CA_1_validation, ..., FOODS_3_595_CA_3_validation, HOUSEHOLD_1_400_CA_4_validation, HOUSEHOLD_1_386_WI_1_validation, HOUSEHOLD_1_020_WI_2_validation]

# upload dataframe to s3

In [18]:
del base_df, base_renamed_df, extracted_base_df, 
gc.collect()

def write_df_to_s3(df, outpath):
    """
    s3にファイルを書き出す処理
    """
    import s3fs
#     key = "your-aws-access-key"
#     secret = "your-aws-secret-access-key"
    bytes_to_write = df.to_csv(None, index=False).encode()
    fs = s3fs.S3FileSystem()
    print("file upload started: " + outpath)
    with fs.open(outpath, 'wb') as f:
      f.write(bytes_to_write)
    print("file upload finished")

bucket_name = 'sagemaker-m5-forecasting-okada' # Replace with your s3 bucket name
path = 'accuracy/aws_forecast'
file_name = "m5_accuracy_base_renamed_custom_domain_df.csv"
url = 's3://{}/{}/{}'.format(bucket_name, path, file_name)
print(f"url: {url}")
write_df_to_s3(base_renamed_custom_domain_df, url)

file_name = "m5_accuracy_base_renamed_retail_domain_df.csv"
url = 's3://{}/{}/{}'.format(bucket_name, path, file_name)
print(f"url: {url}")
write_df_to_s3(base_renamed_retail_domain_df, url)


file_name = "m5_accuracy_base_renamed_retail_domain_related_dataset_df.csv"
url = 's3://{}/{}/{}'.format(bucket_name, path, file_name)
print(f"url: {url}")
write_df_to_s3(base_renamed_retail_domain_related_dataset_df, url)

url: s3://sagemaker-m5-forecasting-okada/accuracy/aws_forecast/m5_accuracy_base_renamed_custom_domain_df.csv
file upload started: s3://sagemaker-m5-forecasting-okada/accuracy/aws_forecast/m5_accuracy_base_renamed_custom_domain_df.csv
file upload finished
url: s3://sagemaker-m5-forecasting-okada/accuracy/aws_forecast/m5_accuracy_base_renamed_retail_domain_df.csv
file upload started: s3://sagemaker-m5-forecasting-okada/accuracy/aws_forecast/m5_accuracy_base_renamed_retail_domain_df.csv
file upload finished
url: s3://sagemaker-m5-forecasting-okada/accuracy/aws_forecast/m5_accuracy_base_renamed_retail_domain_related_dataset_df.csv
file upload started: s3://sagemaker-m5-forecasting-okada/accuracy/aws_forecast/m5_accuracy_base_renamed_retail_domain_related_dataset_df.csv
file upload finished


In [None]:
# from io import StringIO # python3; python2: BytesIO 
# import boto3

# bucket = 'my_bucket_name' # already created on S3
# csv_buffer = StringIO()
# df.to_csv(csv_buffer)
# s3_resource = boto3.resource('s3')
# s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())