In [221]:
import time
import numpy as np
import pandas as pd
from dateutil.parser import parse
from datetime import date, timedelta
from sklearn.preprocessing import LabelEncoder
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait
import functools  

import arrow
import math



In [139]:
data_path = r'/Users/kevindu/Documents/workspace/ai/data/RRVF/' # r'E:\workspace\ai\ml_utils\proj\RRVF\data\\'

def get_data(data_path):
    air_reserve = pd.read_csv(data_path + 'air_reserve.csv').rename(columns={'air_store_id':'store_id'})
    hpg_reserve = pd.read_csv(data_path + 'hpg_reserve.csv').rename(columns={'hpg_store_id':'store_id'})
    air_store = pd.read_csv(data_path + 'air_store_info.csv').rename(columns={'air_store_id':'store_id'})
    hpg_store = pd.read_csv(data_path + 'hpg_store_info.csv').rename(columns={'hpg_store_id':'store_id'})
    air_visit = pd.read_csv(data_path + 'air_visit_data.csv').rename(columns={'air_store_id':'store_id'})
    store_id_map = pd.read_csv(data_path + 'store_id_relation.csv').set_index('hpg_store_id',drop=False)
    date_info = pd.read_csv(data_path + 'date_info.csv').rename(columns={'calendar_date': 'visit_date'}).drop('day_of_week',axis=1)
    submission = pd.read_csv(data_path + 'sample_submission.csv')

    submission['visit_date'] = submission['id'].str[-10:]
    submission['store_id'] = submission['id'].str[:-11]
    air_reserve['visit_date'] = air_reserve['visit_datetime'].str[:10]
    air_reserve['reserve_date'] = air_reserve['reserve_datetime'].str[:10]
    air_reserve['dow'] = pd.to_datetime(air_reserve['visit_date']).dt.dayofweek
    hpg_reserve['visit_date'] = hpg_reserve['visit_datetime'].str[:10]
    hpg_reserve['reserve_date'] = hpg_reserve['reserve_datetime'].str[:10]
    hpg_reserve['dow'] = pd.to_datetime(hpg_reserve['visit_date']).dt.dayofweek
    air_visit['id'] = air_visit['store_id'] + '_' + air_visit['visit_date']
    hpg_reserve['store_id'] = hpg_reserve['store_id'].map(store_id_map['air_store_id']).fillna(hpg_reserve['store_id'])
    hpg_store['store_id'] = hpg_store['store_id'].map(store_id_map['air_store_id']).fillna(hpg_store['store_id'])
    # consider genre in hpg as air genre
    hpg_store.rename(columns={'hpg_genre_name':'air_genre_name','hpg_area_name':'air_area_name'},inplace=True)
    data = pd.concat([air_visit, submission]).copy()
    data['dow'] = pd.to_datetime(data['visit_date']).dt.dayofweek

    # take weekend 5 6 1, as a kind of holiday
    # dow is a very important feature
    date_info['holiday_flg2'] = pd.to_datetime(date_info['visit_date']).dt.dayofweek
    date_info['holiday_flg2'] = ((date_info['holiday_flg2']>4) | (date_info['holiday_flg']==1)).astype(int)

    # Split on area name, should also consider the number of competitors within a distance
    air_store['air_area_name0'] = air_store['air_area_name'].apply(lambda x: x.split(' ')[0])
    lbl = LabelEncoder()
    air_store['air_genre_name'] = lbl.fit_transform(air_store['air_genre_name'])
    air_store['air_area_name0'] = lbl.fit_transform(air_store['air_area_name0'])

    # per the chanllege request
    data['visitors'] = np.log1p(data['visitors'])
    data = data.merge(air_store,on='store_id',how='left')
    data = data.merge(date_info[['visit_date','holiday_flg','holiday_flg2']], on=['visit_date'],how='left')
    result = {
        "data": data,
        "hpg_store": hpg_store,
        "air_store": air_store,
        "air_reserve": air_reserve,
        "hpg_reserve": hpg_reserve,
    }
    return result

In [243]:
def concat(L):
    result = None
    for l in L:
        if result is None:
            result = l
        else:
            try:
                result[l.columns.tolist()] = l
            except:
                print(l.head())
    return result

def left_merge(data1,data2,on):
    if type(on) != list:
        on = [on]
    if (set(on) & set(data2.columns)) != set(on):
        data2_temp = data2.reset_index()
    else:
        data2_temp = data2.copy()
    columns = [f for f in data2.columns if f not in on]
    result = data1.merge(data2_temp,on=on,how='left')
    result = result[columns]
    return result


def diff_of_days(day1, day2):
    days = (parse(day1[:10]) - parse(day2[:10])).days
    return days

def date_add_days(start_date, days):
    end_date = parse(start_date[:10]) + timedelta(days=days)
    end_date = end_date.strftime('%Y-%m-%d')
    return end_date

def get_label(end_date, n_day):
    """ 
    end_date : end of statistic set
    n_day: the span of label set
    """
    label_end_date = date_add_days(end_date, n_day)
    label = data[(data['visit_date'] < label_end_date) & (data['visit_date'] >= end_date)].copy()
    label['end_date'] = end_date
    # diff of pivot date and visit date
    # related to weighting
    label['diff_of_day'] = label['visit_date'].apply(lambda x: diff_of_days(x,end_date))
    label['month'] = label['visit_date'].str[5:7].astype(int)
    label['year'] = label['visit_date'].str[:4].astype(int)
    # before & after holiday trend
    for i in [3,2,1,-1]:
        date_info_temp = date_info.copy()
        date_info_temp['visit_date'] = date_info_temp['visit_date'].apply(lambda x: date_add_days(x,i))
        date_info_temp.rename(columns={'holiday_flg':'ahead_holiday_{}'.format(i),'holiday_flg2':'ahead_holiday2_{}'.format(i)},inplace=True)
        label = label.merge(date_info_temp, on=['visit_date'],how='left')
    label = label.reset_index(drop=True)
    return label

def get_store_visitor_feat(label, key, n_day):
    start_date = date_add_days(key[0],-n_day)
    data_temp = data[(data.visit_date < key[0]) & (data.visit_date > start_date)].copy()
    result = data_temp.groupby(['store_id'], as_index=False)['visitors'].agg({'store_min{}'.format(n_day): 'min',
                                                                             'store_mean{}'.format(n_day): 'mean',
                                                                             'store_median{}'.format(n_day): 'median',
                                                                             'store_max{}'.format(n_day): 'max',
                                                                             'store_count{}'.format(n_day): 'count',
                                                                             'store_std{}'.format(n_day): 'std',
                                                                             'store_skew{}'.format(n_day): 'skew'})
    result = left_merge(label, result, on=['store_id']).fillna(0)
    return result

In [238]:
def make_feats(data_dict, 
                win,
                label_getter, 
                fes=[], 
                high_eng=None):
        pivot_date = win['pivot_date']
        days_in_label = win['days_in_label']
        key = pivot_date, days_in_label # the idx of label set
        label = label_getter(pivot_date, days_in_label)

        result = [label]
        for feature_eng in fes:
            result.append(feature_eng(label, key))
        result.append(label)
        result = concat(result)
        if high_eng:
            result = high_eng(result)
        return result

class TimeseriesDataset():
    def __init__(
            self,
            pivot_date,
            end_date,
            data_dict,
            date_col,
            date_step,
            days_in_label,
            min_num_in_stat_set,
            label_getter, 
            fes=[], 
            high_eng=None
            ):
        self.__pivot_date = pivot_date
        self.__data_dict = data_dict
        self.__date_col = date_col
        self.__end_date = end_date
        self.__date_step = date_step
        self.__days_in_label = days_in_label
        merged_data = data_dict['data']
        windows = []
        max_date = arrow.get(pivot_date)
        min_date = arrow.get(merged_data.visit_date.min())
        delta = (max_date - min_date).days - min_num_in_stat_set
        nwindows_bf_pivot = int((delta ) / date_step )
        nwindows_af_pivot = math.floor((arrow.get(end_date) - arrow.get(pivot_date)).days / date_step)
        
        start_date = min_date.shift(days=min_num_in_stat_set)
        for day_delta in range(nwindows_bf_pivot):
            # >= start & < end
            windows.append(
                {
                    "pivot_date": start_date.format('YYYY-MM-DD'),
                    "days_in_label": days_in_label
                }
            )
            start_date = start_date.shift(days=date_step)
        start_date = max_date.shift(days=date_step)
        ndays_unit_af_pivot = days_in_label  - days_in_label % date_step
        for day_delta in range(nwindows_af_pivot):
            adaptive_len = ndays_unit_af_pivot - ( day_delta * date_step )
            windows.append(
                {
                    "pivot_date": start_date.format('YYYY-MM-DD'),
                    "days_in_label": adaptive_len
                }
            )
            start_date = start_date.shift(days=date_step)
        self.__windows = windows
        print('nwindows_bf_pivot:{}, nwindows_af_pivot {}'
              .format(nwindows_bf_pivot, nwindows_af_pivot))
        print('First window {}'.format(windows[0]))
        print('Last window {}'.format(windows[-1]))
        self.__label_getter = label_getter
        self.__fes= fes
        self.__high_eng = high_eng

        
    def get_trn(self, concurrency=2):
        feats, results = [], []
        step_task = int(len(self.__windows) / 100)
        num_tasks = len(self.__windows)        
        with ThreadPoolExecutor(max_workers=concurrency) as executor:
            feats = executor.map(
                lambda x: make_feats(win=x,
                                    data_dict=self.__data_dict,
                                      label_getter=self.__label_getter,
                                      fes=self.__fes, 
                                      high_eng=self.__high_eng), 
                self.__windows)
        train_feat = pd.concat(feats)
        return train_feat

    def get_test(self, start_date, ndays):
        feats = []
        test_feat = self.make_feats(pivot_date=start_date, ndays=ndays)
        return test_feat

    def generate_trn(self):
        pass

In [160]:
data_dict = get_data(data_path)

In [247]:
fes= [
    lambda label, key: get_store_visitor_feat(label, key, 1000),
    lambda label, key: get_store_visitor_feat(label, key, 56)
]

ts_data = TimeseriesDataset(
    pivot_date='2016-03-12',
    end_date='2016-04-22',
    data_dict=data_dict,
    date_col='visit_date',
    date_step=7,
    days_in_label=39,
    min_num_in_stat_set=37,
    label_getter=get_label, 
    fes=fes, 
    high_eng=None
)


nwindows_bf_pivot:4, nwindows_af_pivot 5
First window {'pivot_date': '2016-02-07', 'days_in_label': 39}
Last window {'pivot_date': '2016-04-16', 'days_in_label': 7}


In [248]:
trn = ts_data.get_trn(4)

In [249]:
trn.head()

Unnamed: 0,id,store_id,visit_date,visitors,dow,air_genre_name,air_area_name,latitude,longitude,air_area_name0,...,store_count1000,store_std1000,store_skew1000,store_min56,store_mean56,store_median56,store_max56,store_count56,store_std56,store_skew56
0,air_ba937bf13d40fb24_2016-02-08,air_ba937bf13d40fb24,2016-02-08,2.995732,0,4,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,7,...,20.0,0.505541,-0.752183,1.94591,2.978754,3.091042,3.828641,20.0,0.505541,-0.752183
1,air_ba937bf13d40fb24_2016-02-09,air_ba937bf13d40fb24,2016-02-09,2.772589,1,4,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,7,...,20.0,0.505541,-0.752183,1.94591,2.978754,3.091042,3.828641,20.0,0.505541,-0.752183
2,air_ba937bf13d40fb24_2016-02-10,air_ba937bf13d40fb24,2016-02-10,3.496508,2,4,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,7,...,20.0,0.505541,-0.752183,1.94591,2.978754,3.091042,3.828641,20.0,0.505541,-0.752183
3,air_ba937bf13d40fb24_2016-02-11,air_ba937bf13d40fb24,2016-02-11,1.386294,3,4,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,7,...,20.0,0.505541,-0.752183,1.94591,2.978754,3.091042,3.828641,20.0,0.505541,-0.752183
4,air_ba937bf13d40fb24_2016-02-12,air_ba937bf13d40fb24,2016-02-12,3.295837,4,4,Tōkyō-to Minato-ku Shibakōen,35.658068,139.751599,7,...,20.0,0.505541,-0.752183,1.94591,2.978754,3.091042,3.828641,20.0,0.505541,-0.752183


In [150]:
trn.columns

Index(['id', 'store_id', 'visit_date', 'visitors', 'dow', 'air_genre_name',
       'air_area_name', 'latitude', 'longitude', 'air_area_name0',
       'holiday_flg', 'holiday_flg2', 'end_date', 'diff_of_day', 'month',
       'year', 'ahead_holiday_3', 'ahead_holiday2_3', 'ahead_holiday_2',
       'ahead_holiday2_2', 'ahead_holiday_1', 'ahead_holiday2_1',
       'ahead_holiday_-1', 'ahead_holiday2_-1'],
      dtype='object')