This notebook shapes the original dataset into a dictionary of a complete time series (missing values are filled with 0) for each pair of shop * item as keys. 

Parallel Processes were used to speed up the creation of this dictionary which will be flattened into one single table with lag variables in another notebook "training_val_analysis".

In [2]:
import numpy as np
import pandas as pd
import math
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
#save and load function
import pickle
def save_obj(obj, name ):
    with open('./'+ name + '.pkl', 'wb') as f:
        pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)

def load_obj(name ):
    with open('./' + name + '.pkl', 'rb') as f:
        return pickle.load(f)

In [8]:
import os
import ipyparallel as ipp

In [9]:
rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pid_map = ar.get_dict()

In [10]:
rc.ids

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [13]:
#load data
df_train = load_obj("data/train_raw_merged")

In [None]:
df_train.head(5)

In [None]:
#for each month, group by shops and items and get average prices and total item_cnt_day (eliminate duplicate dates)

agg_func = {
            'item_price':'mean',
            'item_cnt_day':'sum'
            }


In [None]:
df_agg = df_train.groupby(["date_block_num","shop_id","item_id","item_category_id","top_categories_id"]).agg(agg_func)

In [None]:
df_agg = df_agg.reset_index()

In [None]:
#save
save_obj(df_agg,"data/df_agg")

In [17]:
#load data
df_agg= load_obj("data/df_agg")

In [28]:
#need to complete the time series for each shop_id * item_id
#for missing spots use 0 for item_ptice and item_cnt_day
month_num = df_agg["date_block_num"].unique()
shop_num = df_agg["shop_id"].unique()
item_num = df_agg["item_id"].unique()

In [29]:
import time

In [30]:
60*21807*0.006/60

130.842

In [67]:
#to parallerize
def create_shop_item_dict(df_agg_input):
    dict_by_shop_item = {}
    shop_num = df_agg_input["shop_id"].unique()
    for s in shop_num:
        df_temp1 = df_agg_input.xs(s)
        item_num = df_temp1["item_id"].unique()
        for i in item_num:
            df_temp2 = df_temp1.xs(i)
            dict_by_shop_item[s,i] = df_temp2
    return dict_by_shop_item

In [68]:
df_agg['shop_id_idx'] = df_agg['shop_id']
df_agg['item_id_idx'] = df_agg['item_id']
df_agg.set_index(['shop_id_idx','item_id_idx'],inplace=True)
df_agg_lst = [df_agg[df_agg.shop_id == s] for s in df_agg.shop_id.unique()]

In [69]:
len(df_agg_lst)

60

In [71]:
rc.close()

In [72]:
rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pid_map = ar.get_dict()
v = rc.load_balanced_view()

In [70]:
rc.purge_everything()

In [74]:
#parallel process
t = time.time()
### a dict to store asyncs as they're submitted
asyncs = v.map(create_shop_item_dict,df_agg_lst)
print("costed ", time.time() - t) 

costed  0.10752391815185547


In [75]:
rc.queue_status(targets='all', verbose=True)

{0: {'completed': ['0cb681f8-c198-44c9-ba88-ce21ae26abe6',
   '35ba3795-4141-4fb0-9263-56dbe0fd20ca',
   '5b474b2c-ec33-4ccd-a0d1-9e166b1a12ee',
   '97f8c820-c353-42c8-aa62-5e9eb6562e7d',
   'f02c5b52-79cd-48e4-b72d-01c1d8ae759d',
   '48789983-bb82-4a60-ac6a-3740ce8036f4',
   'fcbac503-ea16-4a23-8f2e-b63f03b8beeb',
   'cbfb10f4-1660-49f5-b742-b823c3d5fc37',
   '918decd6-999f-4977-95b6-2c9f9f3d2fb0',
   '5be9a9b2-1142-4e5a-b074-db17929d8797',
   'c149ceff-26b7-42c9-9d7e-cdd3206d39f8',
   'ba468f1b-d3ef-4289-be44-f3757261907e',
   '4e320bb2-1ec5-4b26-94e7-142fc101ed10',
   '31b8cdd2-66fe-4f58-87e8-c60302e9abd2',
   'a7ea34d8-4929-4edf-96f7-f094fd3e9f27',
   '088b9873-4ecc-49da-9851-8124d674b7bf',
   'db1787a8-e06a-471e-a70b-45340f2a7599',
   '98e6c063-372e-4f72-b4f4-91b4f5ade60d',
   '36032507-d48e-4a81-81c5-134173caf3df',
   'd436705b-aa30-4de3-9296-83c0e7b3827a',
   '02eadeef-a0a3-4825-9a69-70342e4e9e64',
   'c0add72e-c149-4c3a-bad4-ae55a08ae2e7',
   'be672939-53e4-4533-a268-a4e42c909e

In [78]:
asyncs.ready()

True

In [79]:
asyncs.successful()

True

In [80]:
results_arrays = asyncs.get()

In [35]:
save_obj(results_arrays,"data/results_arrays")

In [11]:
results_arrays = load_obj("data/results_arrays")

In [34]:
#purge and close rc
rc.purge_everything()
rc.close()

In [37]:
#this is a list of dictionaries
len(results_arrays)

60

In [12]:
#add up the dict from each engine
def Merge(dict1, dict2): 
    return(dict2.update(dict1)) 

In [13]:
t = {}
for d in results_arrays:
    Merge(d,t)

In [14]:
input_dict_all = t.copy()
len(input_dict_all)

424124

In [18]:
month_num = df_agg["date_block_num"].unique()
shop_num = df_agg["shop_id"].unique()
item_num = df_agg["item_id"].unique()

In [19]:
month_num

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33])

In [107]:
#function to be distributed in parallel
def complete_time_series(args):
    import numpy as np
    import pandas as pd
    dict_input = args[0]
    month_num_lst = args[1]
    cols = args[2]
    dict_output = {}
    for k,v in dict_input.items():
        len_v = len(v.shape)
        if len_v <2:
            month_temp = v["date_block_num"]
            print(month_temp)
            item_cat = v['item_category_id']
            top_temp_cat = v['top_categories_id']  
            m_to_add = [x for x in month_num_lst if x != month_temp]
        else:
            month_temp = v["date_block_num"].unique()
            item_cat = v['item_category_id'].iloc[0]
            top_temp_cat = v['top_categories_id'].iloc[0]
            m_to_add = [x for x in month_num_lst if x not in month_temp]
        
        n = len(m_to_add)
        k_0_lst = np.ones(n)*k[0]
        k_1_lst = np.ones(n)*k[1]
        item_cat_lst = np.ones(n)*item_cat
        top_temp_cat_lst = np.ones(n)*top_temp_cat
        df_tmp = pd.DataFrame(list(zip(m_to_add,k_0_lst,k_1_lst,item_cat_lst,top_temp_cat_lst ,np.zeros(n),np.zeros(n))),columns = cols)
        #add to the original v
        if len_v  < 2:
            #need to transpovse the series into the right shape with index as columns
            df_v = pd.DataFrame(v).transpose()

        else:
            df_v = v

        new_tmp = pd.concat([df_v,df_tmp])
        new_tmp.set_index(["date_block_num"],inplace=True)
        new_tmp = new_tmp.sort_values(["date_block_num"])
        dict_output[k] = new_tmp
        del df_tmp,new_tmp,df_v,month_temp,item_cat,top_temp_cat,m_to_add,k_0_lst,k_1_lst,item_cat_lst,top_temp_cat_lst    
                    
    return dict_output

In [22]:
#complete the time series for each of the combinations of shop * item
cols = input_dict_all[0,947].columns

In [85]:
save_obj(input_dict_all,"data/input_dict_all")

In [87]:
input_dict_all_1 = load_obj("data/input_dict_all")

In [244]:
input_dict_all[1,13346]

Unnamed: 0_level_0,date_block_num,shop_id,item_id,item_category_id,top_categories_id,item_price,item_cnt_day
item_id_idx,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
13346,0,1,13346,81,20,79.0,2.0
13346,1,1,13346,81,20,79.0,3.0


In [241]:
input_dict_all_1[2,4420]

date_block_num         30.0
shop_id                 2.0
item_id              4420.0
item_category_id       55.0
top_categories_id      15.0
item_price            299.0
item_cnt_day            1.0
Name: 4420, dtype: float64

In [282]:
rc.close()

In [283]:
rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pid_map = ar.get_dict()

In [108]:
v = rc.load_balanced_view()

In [109]:
keys_lst = [[] for s in shop_num]

In [110]:
#divide the keys into 60 batches based on shop_id
for k in input_dict_all.keys():
    keys_lst[k[0]].append(k)

In [111]:
keys_lst[2][:10]

[(2, 8764),
 (2, 7905),
 (2, 4420),
 (2, 11785),
 (2, 10093),
 (2, 2746),
 (2, 3329),
 (2, 1892),
 (2, 21901),
 (2, 13700)]

In [112]:
#split by shop keys
input_dict_lst = []

filterByKey = lambda keys: {x: input_dict_all[x] for x in keys}

for x in keys_lst:
    input_dict_lst.append(filterByKey(x))

In [113]:
input_dict_all[1,13346]

Unnamed: 0_level_0,date_block_num,shop_id,item_id,item_category_id,top_categories_id,item_price,item_cnt_day
item_id_idx,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
13346,0,1,13346,81,20,79.0,2.0
13346,1,1,13346,81,20,79.0,3.0


In [114]:
len(input_dict_lst)

60

In [115]:
rc.queue_status(targets='all', verbose=True)

{0: {'completed': ['80b152ba-0edc-44c0-9918-c90fb4649778'],
  'queue': [],
  'tasks': []},
 1: {'completed': ['9b9e4402-dfef-4b80-ab7e-6f07761c7585'],
  'queue': [],
  'tasks': []},
 2: {'completed': ['d54119b6-cc9c-49d3-91c5-5e88fd28834e'],
  'queue': [],
  'tasks': []},
 3: {'completed': ['4bc6443b-2736-4037-b800-d9610b71530f'],
  'queue': [],
  'tasks': []},
 4: {'completed': ['baed0c0d-2727-437c-ba1d-d97afb6a1dca'],
  'queue': [],
  'tasks': []},
 5: {'completed': ['e31a3a3a-72d6-496f-9b1f-7956abcc5a53'],
  'queue': [],
  'tasks': []},
 6: {'completed': ['ffbefe76-65b0-4d17-9357-37711fb08ecf'],
  'queue': [],
  'tasks': []},
 7: {'completed': ['0a16a4f1-d3f2-4024-9adc-ac41eabaa5da'],
  'queue': [],
  'tasks': []},
 8: {'completed': ['6f027a3e-4889-4511-988c-16e2810f8221'],
  'queue': [],
  'tasks': []},
 9: {'completed': ['d1051d98-e73d-46b9-a778-3bda87055301'],
  'queue': [],
  'tasks': []},
 'unassigned': []}

In [116]:
import time

In [117]:
cols

Index(['date_block_num', 'shop_id', 'item_id', 'item_category_id',
       'top_categories_id', 'item_price', 'item_cnt_day'],
      dtype='object')

In [118]:
month_num_map = [month_num for s in shop_num]
cols_map = [cols for s in shop_num]

In [121]:
#parallel process
t = time.time()
### a dict to store asyncs as they're submitted
complete_time_series_result = v.map(complete_time_series,zip(input_dict_lst,month_num_map,cols_map))
print("costed ", time.time() - t) 

costed  265.63323736190796


In [288]:
rc.ids

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [123]:
rc.queue_status(targets='all', verbose=True)

{0: {'completed': ['80b152ba-0edc-44c0-9918-c90fb4649778',
   '13161fca-3bbb-4480-986e-0a8618d0dc0c',
   'd3abbb17-55c7-4b0a-ac98-d453c3056a50',
   '423812a2-b8fa-4b1f-a6ab-dd3a957e2f09',
   'b0cf3c99-db06-4e9f-94e8-2a9d72868296'],
  'queue': [],
  'tasks': []},
 1: {'completed': ['9b9e4402-dfef-4b80-ab7e-6f07761c7585',
   'ccdda4a3-1d80-4268-a4af-ff2a232d32c3',
   'f6c82b20-6082-4ace-b1b6-1ca37358b927',
   '0365eba5-f542-4266-b661-336248b4167d',
   'bce8e56a-5f39-4918-8759-186fa97e9f2f',
   '1be15673-479f-4ae6-9799-eff4112d91ee',
   '608fce7e-4ca6-47a6-831e-afdf812829bb',
   'dcc3c434-4a68-4e5e-9947-834218fcb299'],
  'queue': [],
  'tasks': []},
 2: {'completed': ['d54119b6-cc9c-49d3-91c5-5e88fd28834e',
   'd2ca94f5-d350-4ce3-a347-b370e1db2b9e',
   'e6f4ed18-4714-451e-9791-58aa059aace5',
   '669584a2-0733-4277-aad4-52ecd7727336',
   '2f0cfa44-6138-4e27-8597-ecefb1f2798a',
   '1abe6cb3-ea41-4fd9-a0ad-b869fe464711'],
  'queue': [],
  'tasks': []},
 3: {'completed': ['4bc6443b-2736-4037-

In [124]:
complete_time_series_result.ready()

True

In [125]:
complete_time_series_result.successful()

True

In [126]:
results_2 = complete_time_series_result.get()

In [129]:
#save the results
save_obj(results_2,"data/results_long_process")

In [130]:
len(results_2)

60