# 이전

In [None]:
import pandas as pd
import numpy as np
from tqdm.auto import tqdm

# col 생략 없이 출력
pd.set_option('display.max_columns', None)

log_df = pd.read_parquet("./dataset/real_log_for_spartan_optimized.parquet")
log_sort_df = log_df.sort_values(by=['CustomerID', 'Timestamp'])
log_sort_df = log_sort_df.reset_index(drop=True)
ad_click_df = log_sort_df[log_sort_df['ActionType']=='ad_click']
ad_click_df.loc[:, 'GoodsCode'] = ad_click_df['GoodsCode'].astype(pd.Int16Dtype())

# 결과를 저장할 빈 데이터프레임 생성
result_df = pd.DataFrame()

# `CustomerID`별로 그룹화하고 각 그룹에 대해 merge_asof 실행
for customer_id, group in tqdm(log_sort_df.groupby('CustomerID')):
    temp_ad_click = ad_click_df[ad_click_df['CustomerID'] == customer_id]
    merged_group = pd.merge_asof(group, temp_ad_click, on='Timestamp', tolerance=pd.Timedelta(days=7), direction='backward', suffixes=('', '_ad'))
    result_df = pd.concat([result_df, merged_group])

In [1]:
import dask.dataframe as dd
import pandas as pd
from tqdm.auto import tqdm

# 예시 데이터 로드 및 전처리 (Pandas)
log_df = pd.read_parquet("./dataset/real_log_for_spartan_optimized.parquet")

In [2]:
len(log_df)

3096931

In [2]:
log_df['CustomerID'].nunique()

228675

In [2]:
import dask.dataframe as dd
import pandas as pd
from tqdm.auto import tqdm

# 예시 데이터 로드 및 전처리 (Pandas)
log_df = pd.read_parquet("./dataset/real_log_for_spartan_optimized.parquet")
log_sort_df = log_df.sort_values(by=['CustomerID', 'Timestamp']).reset_index(drop=True)
ad_click_df = log_sort_df[log_sort_df['ActionType'] == 'ad_click']
ad_click_df.loc[:, 'GoodsCode'] = ad_click_df['GoodsCode'].astype(pd.Int16Dtype())

# Pandas DataFrame을 Dask DataFrame으로 변환
log_sort_ddf = dd.from_pandas(log_sort_df, npartitions=10)
ad_click_ddf = dd.from_pandas(ad_click_df, npartitions=10)

# 각 CustomerID별로 처리 후 결과를 리스트에 저장
results = []
# grouped = list(log_sort_ddf.groupby('CustomerID'))  # Dask groupby 결과를 리스트로 변환
for customer_id, group in log_sort_ddf.groupby('CustomerID'): # tqdm(grouped):  # tqdm으로 진행 상황 표시 
    temp_ad_click = ad_click_ddf[ad_click_ddf['CustomerID'] == customer_id]
    merged_group = dd.merge_asof(group, temp_ad_click, on='Timestamp', tolerance=pd.Timedelta(days=7), direction='backward', suffixes=('', '_ad'))
    results.append(merged_group)

# 모든 결과를 합침
final_result_ddf = dd.concat(results)

# 최종 결과를 확인 (Dask DataFrame을 Pandas DataFrame으로 변환)
final_result_df = final_result_ddf.compute()
print(final_result_df)


NotImplementedError: Iteration of DataFrameGroupBy objects requires computing the groups which may be slow. You probably want to use 'apply' to execute a function for all the columns. To access individual groups, use 'get_group'. To list all the group names, use 'df[<group column>].unique().compute()'.

In [1]:
import dask.dataframe as dd
import pandas as pd
from tqdm.auto import tqdm

# 예시 데이터 로드 및 전처리 (Pandas)
log_df = pd.read_parquet("./dataset/real_log_for_spartan_optimized.parquet")
log_sort_df = log_df.sort_values(by=['CustomerID', 'Timestamp']).reset_index(drop=True)
ad_click_df = log_sort_df[log_sort_df['ActionType'] == 'ad_click']
ad_click_df.loc[:, 'GoodsCode'] = ad_click_df['GoodsCode'].astype(pd.Int16Dtype())

# Pandas DataFrame을 Dask DataFrame으로 변환
log_sort_ddf = dd.from_pandas(log_sort_df, npartitions=10)
ad_click_ddf = dd.from_pandas(ad_click_df, npartitions=10)

# CustomerID 기준으로 데이터 파티션
log_sort_ddf = log_sort_ddf.shuffle('CustomerID')
ad_click_ddf = ad_click_ddf.shuffle('CustomerID')

# apply를 사용하여 각 CustomerID별로 처리
def process_group(group, ad_click_ddf):
    customer_id = group['CustomerID'].iloc[0]
    temp_ad_click = ad_click_ddf[ad_click_ddf['CustomerID'] == customer_id]
    return dd.merge_asof(group, temp_ad_click, on='Timestamp', tolerance=pd.Timedelta(days=7), direction='backward', suffixes=('', '_ad'))

results = log_sort_ddf.groupby('CustomerID').apply(process_group, ad_click_ddf=ad_click_ddf, meta=log_sort_ddf).compute()

display(results)

ValueError: left keys must be sorted

# 다시 시간 단축

In [1]:
import pandas as pd
from tqdm import tqdm

log_df = pd.read_parquet("./dataset/real_log_for_spartan_optimized.parquet")
log_sort_df = log_df.sort_values(by=['CustomerID', 'Timestamp'])
log_sort_df = log_sort_df.reset_index(drop=True)

In [2]:
log_sort_partitioin_dict = {}
for num in range(10):
    log_sort_partitioin_dict[f'df_{num}'] = log_sort_df[log_sort_df['CustomerID']%10 == num]

In [3]:
for num, df in log_sort_partitioin_dict.items():
    print(f"{num}, {len(df)}")

df_0, 310840
df_1, 311817
df_2, 308119
df_3, 312746
df_4, 310147
df_5, 309864
df_6, 308830
df_7, 306167
df_8, 310450
df_9, 307951


In [4]:
ad_click_partitioin_dict = {}
for df_name, _ in log_sort_partitioin_dict.items():
    ad_click_partitioin_dict[df_name] = log_sort_partitioin_dict[df_name][log_sort_partitioin_dict[df_name]['ActionType']=='ad_click']
    ad_click_partitioin_dict[df_name] = ad_click_partitioin_dict[df_name].astype({'GoodsCode':'Int16'})

In [5]:
# 결과를 저장할 빈 데이터프레임 생성
result_dict = {}
for df_name, log_df in tqdm(log_sort_partitioin_dict.items()):
    result_dict[df_name] = pd.DataFrame()
    print(df_name)
    # `CustomerID`별로 그룹화하고 각 그룹에 대해 merge_asof 실행
    for customer_id, group in tqdm(log_df.groupby('CustomerID')):
        temp_ad_click = ad_click_partitioin_dict[df_name][ad_click_partitioin_dict[df_name]['CustomerID'] == customer_id]
        merged_group = pd.merge_asof(group, temp_ad_click, on='Timestamp', tolerance=pd.Timedelta(days=7), direction='backward', suffixes=('', '_ad'))
        result_dict[df_name] = pd.concat([result_dict[df_name], merged_group])

  0%|          | 0/10 [00:00<?, ?it/s]

df_0


100%|██████████| 22950/22950 [02:25<00:00, 157.66it/s]
 10%|█         | 1/10 [02:25<21:52, 145.79s/it]

df_1


100%|██████████| 22964/22964 [02:25<00:00, 158.01it/s]
 20%|██        | 2/10 [04:51<19:24, 145.59s/it]

df_2


100%|██████████| 22786/22786 [02:21<00:00, 160.70it/s]
 30%|███       | 3/10 [07:13<16:48, 144.08s/it]

df_3


100%|██████████| 22935/22935 [02:26<00:00, 156.17it/s]
 40%|████      | 4/10 [09:40<14:31, 145.22s/it]

df_4


100%|██████████| 22894/22894 [02:26<00:00, 156.46it/s]
 50%|█████     | 5/10 [12:06<12:08, 145.65s/it]

df_5


100%|██████████| 22875/22875 [02:25<00:00, 157.61it/s]
 60%|██████    | 6/10 [14:32<09:42, 145.74s/it]

df_6


100%|██████████| 22922/22922 [02:26<00:00, 156.44it/s]
 70%|███████   | 7/10 [16:59<07:18, 146.03s/it]

df_7


100%|██████████| 22640/22640 [02:24<00:00, 156.48it/s]
 80%|████████  | 8/10 [19:24<04:51, 145.63s/it]

df_8


100%|██████████| 22886/22886 [02:26<00:00, 155.78it/s]
 90%|█████████ | 9/10 [21:51<02:26, 146.07s/it]

df_9


100%|██████████| 22823/22823 [02:25<00:00, 156.59it/s]
100%|██████████| 10/10 [24:18<00:00, 145.81s/it]


In [6]:
for df in result_dict.values():
    # print(df)
    display(df.head(3))

Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,30,view,1163,100790,,,2022-02-12 18:58:09.707955,,,,,,
1,30,view,1163,100790,,,2022-02-24 11:56:13.707955,,,,,,
2,30,basket,1163,100790,,,2022-02-26 19:07:45.707955,,,,,,


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,81,view,1334,10890,,,2022-05-02 17:18:45.297618,,,,,,
1,81,view,1334,10890,,,2022-05-10 03:52:06.297618,,,,,,
2,81,purchase,1334,10890,,,2022-06-06 17:14:05.297618,,,,,,


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,32,view,1434,18720,,,2022-02-10 00:45:33.376860,,,,,,
1,32,basket,1434,18720,,,2022-02-11 14:00:06.376860,,,,,,
2,32,purchase,1201,13550,,,2022-02-26 16:27:44.376860,,,,,,


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,13,view,1367,42850,,,2022-02-25 16:00:46.855443,,,,,,
1,13,purchase,1367,42850,,,2022-02-27 08:38:57.855443,,,,,,
2,13,ad_click,1090,21100,ad_google,158.68,2022-03-11 18:14:48.855443,13.0,ad_click,1090.0,21100.0,ad_google,158.68


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,4,view,1492,15020,,,2022-07-04 15:11:42.511967,,,,,,
1,4,view,1492,15020,,,2022-07-24 14:44:40.511967,,,,,,
0,14,view,1280,9270,,,2022-02-14 07:41:09.548570,,,,,,


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,55,view,1370,8190,,,2022-02-10 16:56:23.008345,,,,,,
1,55,view,1370,8190,,,2022-02-12 03:43:16.008345,,,,,,
2,55,view,1035,21440,,,2022-03-05 11:50:13.008345,,,,,,


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,6,view,1415,12980,,,2022-04-22 15:49:44.166991,,,,,,
1,6,purchase,1437,35290,,,2022-04-26 21:46:00.166991,,,,,,
2,6,view,1217,16220,,,2022-05-03 14:26:12.811783,,,,,,


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,17,purchase,1445,19950,,,2022-03-13 15:47:28.525882,,,,,,
1,17,view,1086,144050,,,2022-03-17 22:53:09.525882,,,,,,
2,17,basket,1094,100040,,,2022-03-20 15:30:33.525882,,,,,,


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,8,basket,1372,24930,,,2022-02-24 01:10:10.156038,,,,,,
1,8,view,1372,24930,,,2022-03-08 02:02:34.156038,,,,,,
2,8,purchase,1372,24930,,,2022-03-08 03:17:01.156038,,,,,,


Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,89,purchase,1087,136830,,,2022-03-13 14:17:01.534376,,,,,,
1,89,view,1087,136830,,,2022-03-14 11:27:23.534376,,,,,,
2,89,view,1487,19730,,,2022-03-30 22:26:08.534376,,,,,,


In [7]:
result_for_concat_list = []
for df in result_dict.values():
    result_for_concat_list.append(df)

In [8]:
finally_result = pd.concat(result_for_concat_list).sort_values(by=['CustomerID', 'Timestamp'])

In [9]:
finally_result

Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,CustomerID_ad,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
0,4,view,1492,15020,,,2022-07-04 15:11:42.511967,,,,,,
1,4,view,1492,15020,,,2022-07-24 14:44:40.511967,,,,,,
0,6,view,1415,12980,,,2022-04-22 15:49:44.166991,,,,,,
1,6,purchase,1437,35290,,,2022-04-26 21:46:00.166991,,,,,,
2,6,view,1217,16220,,,2022-05-03 14:26:12.811783,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
28,899999,view,1059,21230,,,2022-05-22 12:51:15.722389,,,,,,
29,899999,basket,1059,21230,,,2022-05-23 01:28:11.722389,,,,,,
30,899999,view,1059,21230,,,2022-05-24 04:40:26.722389,,,,,,
31,899999,purchase,1034,12910,,,2022-06-20 03:40:43.722389,,,,,,


In [None]:
# finally_result.to_parquet("result_all_df.parquet")

In [5]:
import pandas as pd

log_eval_all_df = pd.read_parquet("result_all_df.parquet")

In [7]:
import numpy as np

# null값 있어서 Int16으로 했는데 이 타입 있으면 np.select typeerror나서 null->-1로 하고 타입변환
log_eval_all_df['GoodsCode_ad'] = log_eval_all_df['GoodsCode_ad'].fillna(-1)
log_eval_all_df['GoodsCode_ad'] = log_eval_all_df['GoodsCode_ad'].astype('int16')

# 조건 리스트
conditions = [
    (log_eval_all_df['GoodsCode_ad']==-1) & (log_eval_all_df['ActionType']=='view'), # not_ad_view
    (log_eval_all_df['GoodsCode_ad']==-1) & (log_eval_all_df['ActionType']=='basket'), # not_ad_basket
    (log_eval_all_df['GoodsCode_ad']==-1) & (log_eval_all_df['ActionType']=='purchase'), # not_ad_purchase
    ((log_eval_all_df['GoodsCode_ad']!=-1) & (log_eval_all_df['ActionType']=='ad_click')), # ad_click
    ((log_eval_all_df['GoodsCode_ad']!=-1) & (log_eval_all_df['ActionType']=='purchase')
     & (log_eval_all_df['GoodsCode']==log_eval_all_df['GoodsCode_ad'])), # ad_purchase
    ((log_eval_all_df['GoodsCode_ad']!=-1) & (log_eval_all_df['ActionType']=='purchase') 
     & (log_eval_all_df['GoodsCode']!=log_eval_all_df['GoodsCode_ad'])), # ad_purchase_other
    ((log_eval_all_df['GoodsCode_ad']!=-1) & (log_eval_all_df['ActionType']=='view') 
     & (log_eval_all_df['GoodsCode']==log_eval_all_df['GoodsCode_ad'])), # ad_view
    ((log_eval_all_df['GoodsCode_ad']!=-1) & (log_eval_all_df['ActionType']=='basket') 
     & (log_eval_all_df['GoodsCode']==log_eval_all_df['GoodsCode_ad'])), # ad_basket
    ((log_eval_all_df['GoodsCode_ad']!=-1) & (log_eval_all_df['ActionType']=='view') 
     & (log_eval_all_df['GoodsCode']!=log_eval_all_df['GoodsCode_ad'])), # ad_view_other
    ((log_eval_all_df['GoodsCode_ad']!=-1) & (log_eval_all_df['ActionType']=='basket') 
     & (log_eval_all_df['GoodsCode']!=log_eval_all_df['GoodsCode_ad'])), # ad_basket_other
    
]

# 조건에 따른 값 리스트
choices = ['not_ad_view', 'not_ad_basket', 'not_ad_purchase', 
           'ad_click', 'ad_purchase', 'ad_purchase_other', 
           'ad_view', 'ad_basket', 'ad_view_other', 'ad_basket_other']

# numpy.select를 사용하여 새 컬럼에 조건부 값 할당
log_eval_all_df = log_eval_all_df.assign(eval_type=np.select(conditions, choices, default='Fail'))

# 혹시 몰라 결과 공유하기 위해서 저장
# log_eval_all_df.to_parquet("log_eval_all_df.parquet")

In [2]:
import pandas as pd

log_eval_all_df = pd.read_parquet("log_eval_all_df.parquet")
log_eval_all_df = log_eval_all_df.drop(['CustomerID_ad', 'ActionType_ad'], axis=1)

In [8]:
log_eval_all_df[log_eval_all_df['GoodsCode_ad']!=-1]

Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad,eval_type
5,6,ad_click,1437,35290,ad_google,182.31,2022-05-07 21:00:59.166991,1437,35290.0,ad_google,182.31,ad_click
6,6,view,1437,35290,,,2022-05-12 10:25:41.166991,1437,35290.0,ad_google,182.31,ad_view
2,13,ad_click,1090,21100,ad_google,158.68,2022-03-11 18:14:48.855443,1090,21100.0,ad_google,158.68,ad_click
3,13,view,1203,85290,,,2022-03-17 14:31:19.855443,1090,21100.0,ad_google,158.68,ad_view_other
4,14,ad_click,1000,12580,ad_google,104.04,2022-03-07 10:33:24.548570,1000,12580.0,ad_google,104.04,ad_click
...,...,...,...,...,...,...,...,...,...,...,...,...
16,899993,view,1045,120410,,,2022-05-11 03:04:39.372529,1019,16840.0,ad_meta,88.20,ad_view_other
18,899999,ad_click,1059,21230,ad_meta,71.35,2022-04-06 21:09:47.722389,1059,21230.0,ad_meta,71.35,ad_click
19,899999,view,1271,92940,,,2022-04-10 18:07:56.722389,1059,21230.0,ad_meta,71.35,ad_view_other
25,899999,ad_click,1059,21230,ad_google,94.14,2022-05-06 00:08:17.722389,1059,21230.0,ad_google,94.14,ad_click


# by중복있으면 안되는 것 아니였나? 다른 곳에선 또 되는 것 같아서 다시 확인(확인 결과 중복이 문제가 아니라 on의 컬럼이 가장 첫 기준으로 정렬되어있어야 하는 듯)

In [1]:
import pandas as pd
from tqdm import tqdm

log_df = pd.read_parquet("./dataset/real_log_for_spartan_optimized.parquet")
log_sort_df = log_df.sort_values(by=['CustomerID', 'Timestamp'])
log_sort_df = log_sort_df.reset_index(drop=True)

In [21]:
merge_log_sort_df = log_sort_df.sort_values(by='Timestamp')

In [22]:
merge_ad_click = merge_log_sort_df[merge_log_sort_df['ActionType']=='ad_click'].astype({'GoodsCode':'Int16'})

In [24]:
merge_simple = pd.merge_asof(merge_log_sort_df, merge_ad_click, on='Timestamp', by='CustomerID', tolerance=pd.Timedelta(days=7), direction='backward', suffixes=('', '_ad'))

In [25]:
merge_simple = merge_simple.sort_values(by=['CustomerID', 'Timestamp'])
merge_simple

Unnamed: 0,CustomerID,ActionType,GoodsCode,Price,AdID,CPC,Timestamp,ActionType_ad,GoodsCode_ad,Price_ad,AdID_ad,CPC_ad
3081601,4,view,1492,15020,,,2022-07-04 15:11:42.511967,,,,,
3090577,4,view,1492,15020,,,2022-07-24 14:44:40.511967,,,,,
1627572,6,view,1415,12980,,,2022-04-22 15:49:44.166991,,,,,
1773519,6,purchase,1437,35290,,,2022-04-26 21:46:00.166991,,,,,
1998478,6,view,1217,16220,,,2022-05-03 14:26:12.811783,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...
2552733,899999,view,1059,21230,,,2022-05-22 12:51:15.722389,,,,,
2565501,899999,basket,1059,21230,,,2022-05-23 01:28:11.722389,,,,,
2592523,899999,view,1059,21230,,,2022-05-24 04:40:26.722389,,,,,
3016105,899999,purchase,1034,12910,,,2022-06-20 03:40:43.722389,,,,,
