# 準備

In [1]:
# ライブラリの読み込み
import numpy as np
import pandas as pd
import random
import pickle
import multiprocessing
from datetime import datetime
from datetime import timedelta
from time import time

In [5]:
# デモデータ作成
# ランダムなIDと日時
# df_userは10,000行
# df_logは100,000行

random.seed(256)

# ランダムID
r1 = [random.randint(1, 10000) for i in range(10000)]
r1.sort()
r2 = [random.randint(1, 10000) for i in range(100000)]
r2.sort()

# ランダム日時
now = datetime.now()
datetime1 = [now-timedelta(random.randint(1, 1000)) for i in range(10000)]
datetime2 = [now-timedelta(random.randint(1, 1000)) for i in range(100000)]

# user
df_user = pd.DataFrame({
    "userID": r1,
    "trigger_dt": datetime1
})

# log
df_log = pd.DataFrame({
    "userID": r2,
    "event_dt": datetime2
})

In [22]:
df_user.head()

Unnamed: 0,userID,trigger_dt,event_cnt
0,1,2018-11-01 14:12:11.353445,8.0
1,3,2018-11-18 14:12:11.353445,14.0
2,6,2018-07-30 14:12:11.353445,5.0
3,7,2018-01-19 14:12:11.353445,8.0
4,8,2016-11-22 14:12:11.353445,2.0


In [23]:
df_log.head()

Unnamed: 0,userID,event_dt
0,1,2016-11-27 14:12:11.353445
1,1,2017-11-03 14:12:11.353445
2,1,2017-06-11 14:12:11.353445
3,1,2017-05-17 14:12:11.353445
4,1,2018-08-21 14:12:11.353445


# 時間短縮前

In [8]:
# before
# 結果を代入する列を作成
df_user["event_cnt"] = np.nan

# メイン処理
t1 =time()
for i in range(df_user.shape[0]):
    # df_userの対象行のuserIDを取得
    user_id = df_user["userID"].iloc[i]
    # df_userの対象行のuserIDと同じか否かのTrue, Falseを作成
    user_in_log = df_log["userID"]==user_id
    # df_userの対象行のユーザーかつ対象行のtrigger以前に起こったeventの数を代入
    df_user["event_cnt"].iloc[i] = df_log[(user_in_log) & (df_log["event_dt"]<df_user["trigger_dt"].iloc[i])].shape[0]

    # 進捗を表示
    if i%1000 == 0:
        print("{}...time: {}".format(i, time()-t1))

# かかった時間を表示
print("process_finish: {}".format(time()-t1))

# 結果を保存
with open("result.pickle", mode='wb') as f:
    pickle.dump(df_user, f)


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self._setitem_with_indexer(indexer, value)


0...time: 0.06981420516967773
1000...time: 56.50749945640564
2000...time: 113.47105884552002
3000...time: 168.21672081947327
4000...time: 222.70379543304443
5000...time: 278.85570669174194
6000...time: 334.8989362716675
7000...time: 390.94702911376953
8000...time: 447.9054775238037
9000...time: 506.94193148612976
process_finish: 565.0461707115173


## 結果確認

In [24]:
df_user.head()

Unnamed: 0,userID,trigger_dt,event_cnt
0,1,2018-11-01 14:12:11.353445,8.0
1,3,2018-11-18 14:12:11.353445,14.0
2,6,2018-07-30 14:12:11.353445,5.0
3,7,2018-01-19 14:12:11.353445,8.0
4,8,2016-11-22 14:12:11.353445,2.0


In [10]:
# ある行を抜き出し日時と代入結果を確認
i = 0
user_id = df_user["userID"].iloc[i]
df_user.iloc[i]

userID                                 1
trigger_dt    2018-11-01 14:12:11.353445
event_cnt                              8
Name: 0, dtype: object

In [11]:
# 対象行のuserIDのlogの中でtrigger日時以前の行を数える
# 代入結果と一致するか確認
df_log[df_log["userID"]==user_id].sort_values("event_dt")

Unnamed: 0,userID,event_dt
8,1,2016-08-29 14:12:11.353445
0,1,2016-11-27 14:12:11.353445
3,1,2017-05-17 14:12:11.353445
5,1,2017-06-03 14:12:11.353445
2,1,2017-06-11 14:12:11.353445
1,1,2017-11-03 14:12:11.353445
7,1,2017-12-24 14:12:11.353445
4,1,2018-08-21 14:12:11.353445
6,1,2019-04-07 14:12:11.353445


# 時間短縮後

In [16]:
# after
# 結果を代入する列を作成
df_user["event_cnt"] = np.nan

# 前もってSeriesを抽出しておく
s_user_userID = df_user["userID"]
s_user_dt = df_user["trigger_dt"]
s_log_userID = df_log["userID"]

# multiprocessing用にindexをd分割したリストを作成
d = 15
x = int(np.linspace(0, df_user.shape[0], d)[1])
id_lists = [[i*x, (i+1)*x] for i in range(d-1)]
id_lists.append([id_lists[-1][-1], df_user.shape[0]])

# メイン処理
# multiprocessing用に処理を関数化
def event_cnt(id_list):
    t1 =time()
    for i in range(id_list[0], id_list[1]):
        # df_userの対象行のuserIDを取得
        user_id = s_user_userID.iloc[i]
        # df_userの対象行のuserIDと同じか否かのTrue, Falseを作成
        user_in_log = s_log_userID==user_id
        # df_logから対象userの行だけ抽出しておく
        df_1 = df_log[user_in_log]
        # 対象userの行がdf_logに存在する場合のみ代入を行う
        if df_1.shape[0]>0:
            # df_userの対象行のユーザーかつ対象行のtrigger以前に起こったeventの数を代入
            df_user.at[i, "event_cnt"] = len(df_1[df_1["event_dt"]<s_user_dt.iloc[i]])
            
        # 進捗を表示
        if i%1000 == 0:
            print("process_{}-{}: {}...time: {}".format(id_list[0], id_list[1], i, time()-t1))

    # かかった時間を表示
    print("process_{}-{}_finish: {}".format(id_list[0], id_list[1], time()-t1))

    # 結果を保存
    with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='wb') as f:
        pickle.dump(df_user, f)

# 並列処理実行
with multiprocessing.Pool(d) as pool:
    pool.map(event_cnt, id_lists)


process_0-714: 0...time: 0.011498212814331055
process_4998-5712: 5000...time: 0.01985907554626465
process_9996-10000_finish: 0.02092766761779785
process_2856-3570: 3000...time: 0.3586902618408203
process_7854-8568: 8000...time: 0.3677937984466553
process_714-1428: 1000...time: 0.49004507064819336
process_5712-6426: 6000...time: 0.6779203414916992
process_3570-4284: 4000...time: 0.8131577968597412
process_6426-7140: 7000...time: 0.964698076248169
process_8568-9282: 9000...time: 1.0305662155151367
process_6426-7140_finish: 1.1923942565917969
process_9282-9996_finish: 1.3662109375
process_1428-2142: 2000...time: 1.3731131553649902
process_8568-9282_finish: 1.4752998352050781
process_3570-4284_finish: 1.4812278747558594
process_714-1428_finish: 1.488349437713623
process_0-714_finish: 1.4931654930114746
process_4284-4998_finish: 1.5344908237457275
process_7854-8568_finish: 1.5600390434265137
process_2142-2856_finish: 1.575411319732666
process_7140-7854_finish: 1.5844135284423828
process_571

## 結果統合

In [17]:
# multiprocessing用にindexをd分割したリストを作成
d = 15
x = int(np.linspace(0, df_user.shape[0], d)[1])
id_lists = [[i*x, (i+1)*x] for i in range(d-1)]
id_lists.append([id_lists[-1][-1], df_user.shape[0]])

In [18]:
# 読み込み・統合
id_list = id_lists[0]
with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='rb') as f:
    df_user = pickle.load(f)

for id_list in id_lists[1:]:
    with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='rb') as f:
        df_user_tmp = pickle.load(f)
        
    # 結果を統合
    df_user["event_cnt"].iloc[id_list[0]:id_list[1]] = df_user_tmp["event_cnt"].iloc[id_list[0]:id_list[1]]

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self._setitem_with_indexer(indexer, value)


## 結果確認

In [25]:
df_user.head()

Unnamed: 0,userID,trigger_dt,event_cnt
0,1,2018-11-01 14:12:11.353445,8.0
1,3,2018-11-18 14:12:11.353445,14.0
2,6,2018-07-30 14:12:11.353445,5.0
3,7,2018-01-19 14:12:11.353445,8.0
4,8,2016-11-22 14:12:11.353445,2.0


In [20]:
# ある行を抜き出し日時と代入結果を確認
i = 0
user_id = df_user["userID"].iloc[i]
df_user.iloc[i]

userID                                 1
trigger_dt    2018-11-01 14:12:11.353445
event_cnt                              8
Name: 0, dtype: object

In [21]:
# 対象行のuserIDのlogの中でtrigger日時以前の行を数える
# 代入結果と一致するか確認
df_log[df_log["userID"]==user_id].sort_values("event_dt")

Unnamed: 0,userID,event_dt
8,1,2016-08-29 14:12:11.353445
0,1,2016-11-27 14:12:11.353445
3,1,2017-05-17 14:12:11.353445
5,1,2017-06-03 14:12:11.353445
2,1,2017-06-11 14:12:11.353445
1,1,2017-11-03 14:12:11.353445
7,1,2017-12-24 14:12:11.353445
4,1,2018-08-21 14:12:11.353445
6,1,2019-04-07 14:12:11.353445
