In [14]:
import os
import glob
import pandas as pd
import numpy as np
import dask.dataframe as dd

In [26]:
def list_files_in_directory(directory, pattern='*'):
    # 使用 glob 模組列出資料夾中符合模式的檔案
    files = glob.glob(os.path.join(directory, pattern))
    return files

In [27]:
behavior_folder = '/home/namwoam/nas/homes/namwoam_data/bda-final/behavior'
behavior_files = list_files_in_directory(behavior_folder)
behavior_files

['/home/namwoam/nas/homes/namwoam_data/bda-final/behavior/2023-10',
 '/home/namwoam/nas/homes/namwoam_data/bda-final/behavior/2023-11',
 '/home/namwoam/nas/homes/namwoam_data/bda-final/behavior/2024-01',
 '/home/namwoam/nas/homes/namwoam_data/bda-final/behavior/2024-02',
 '/home/namwoam/nas/homes/namwoam_data/bda-final/behavior/2023-12',
 '/home/namwoam/nas/homes/namwoam_data/bda-final/behavior/2023-09']

In [37]:
import random
dtype_spec = {
    'CategoryId': 'str',
    'ContentId': 'str',
    'ContentName': 'str',
    'ContentType': 'str',
    'PageType': 'str',
    'RegisterTunnel': 'str',
    'SearchTerm': 'str',
    'TradesGroupCode': 'str'
}
folder = list_files_in_directory(behavior_files[0])
print("Folder:", len(folder))
folder = random.sample(folder, 10)
dask_df = dd.read_csv(folder, dtype=dtype_spec, blocksize="64MB")
print("All df size:", len(dask_df.index))
dask_df = dask_df.sort_values(by=['FullvisitorId', 'EventTime'])
dask_df = dask_df.compute()
dask_df['EventTime'] = dd.to_datetime(dask_df['EventTime'])
dask_df = dask_df.reset_index(drop=True)
dask_df = dask_df.drop(columns=['Tunnel', 'Device', 'DeviceId', 'RegisterTunnel',
                       'SearchTerm', 'ContentType', 'ContentName', 'ContentId'])

Folder: 253
All df size: 1000000


In [38]:
def filter_userid(df, idx):
    return df[df['FullvisitorId'] == idx]

In [39]:
def calculate_time_to_checkout(df):
    first_action_time = None
    actions_in_interval = []
    intervals = []
    for _, row in df.iterrows():
        if row['Behavior'] == 'purchase':
            if first_action_time:
                # record datapoint
                interval = row['EventTime'] - first_action_time
                intervals.append({
                    'customer_id': row['FullvisitorId'],
                    'actions': actions_in_interval.copy(),
                    'start_time': first_action_time,
                    'end_time': row['EventTime'],
                    'time_to_checkout': interval,
                    'trade_order': row['TradesGroupCode']
                })
            # renew first action
            first_action_time = row['EventTime']
            actions_in_interval = []
        else:
            if not first_action_time:
                first_action_time = row['EventTime']
            actions_in_interval.append(row['Behavior'])

    return pd.DataFrame(intervals)

In [40]:
def process_result(df):
    df['mean_time_to_check'] = df['time_to_checkout'].mean()
    df['hesitate'] = (df['time_to_checkout'] > df['mean_time_to_check']).astype(int)
    return df

In [56]:
folder = list_files_in_directory(behavior_files[0])
print(len(folder))
dask_df = dd.read_csv(folder, dtype=dtype_spec)
dask_df = dask_df.drop(columns=['Tunnel', 'Device', 'DeviceId', 'RegisterTunnel', 'SearchTerm', 'ContentType', 'ContentName', 'ContentId'])
dask_df.head()

253


Unnamed: 0,ShopId,ShopMemberId,FullvisitorId,HitTime,Behavior,CategoryId,SalePageId,UnitPrice,Qty,TotalSalesAmount,TradesGroupCode,PageType,EventTime
0,NOmceSRCGAE2GjCOCwaCrA==,TisggK99g9VoSBNxuQC3diInFh33XejoUXfzsEtrAqg=,EnnzFYKeLfp1RHP344a7B3++rvhlJqFYUZr1ck2UmqcLBL...,2023-10-02 21:55:28.934,viewecoupondetail,596196.0,,,,,,,2023-10-02 22:00:32.118
1,NOmceSRCGAE2GjCOCwaCrA==,1WY6oaz7DhmzKYzSRoILK2Prtr9v4VxTLnt5DQDzERo=,05FPUeiFfgpwKjL+Vc0yVBhYqyXES673MM5J2B9XeUgVFU...,2023-10-17 18:00:33.433,viewecoupondetail,603543.0,,,,,,,2023-10-17 18:00:40.580
2,NOmceSRCGAE2GjCOCwaCrA==,gJj5tvejNHJerW6ETcC91mXFb8/YN29acBAUECv00sc=,RwfKpbBk4/3lo8PSB6CTsNR5graqN/d1HNSGYY2mHbb1Wv...,2023-10-29 18:02:39.101,viewecoupondetail,,,,,,,,2023-10-29 18:02:54.864
3,NOmceSRCGAE2GjCOCwaCrA==,,FX6eNjJpkak9+f+iaPzPw/tYD7yAqZzjW2UJ2X7nUqCoa3...,2023-10-09 14:09:57.471,viewecoupondetail,,,,,,,,2023-10-09 14:10:27.681
4,NOmceSRCGAE2GjCOCwaCrA==,k5FicMluO0vFxdwFQo8ZKe3SGpHp000nUGvOs4DLBtQ=,QwogiUD+jY7zV/83YaP1fZJEQXEnHU1ylMatUSaXxS9cIC...,2023-10-18 10:28:50.539,viewecoupondetail,603549.0,,,,,,,2023-10-18 10:28:54.691


In [54]:
visitor_id = dask_df['FullvisitorId']
print("Visitor count:", len(dask_df.index))
id_set = list(set(np.array(visitor_id.compute())))
id_set

KeyboardInterrupt: 

In [51]:
result_df = pd.DataFrame()
for i in range(len(id_set)):
    temp = dask_df.map_partitions(filter_userid, id_set[i]).compute()
    temp['EventTime'] = pd.to_datetime(temp['EventTime'])
    temp = temp.sort_values('EventTime')
    result = temp.groupby('FullvisitorId').apply(calculate_time_to_checkout).reset_index(drop=True)
    result_df = pd.concat([result_df, result])
result_df = process_result(result_df)

ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.. If you don't want the partitions to be aligned, and are calling `map_partitions` directly, pass `align_dataframes=False`.

# 示範範例

In [None]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# 生成示例数据
np.random.seed(42)

num_records = 100
fullvisitor_ids = np.random.choice(range(1, 21), size=num_records)  # 20个访客
events = np.random.choice(['view', 'click', 'purchase'], size=num_records, p=[0.6, 0.3, 0.1])
start_date = datetime(2024, 1, 1)
event_times = [start_date + timedelta(minutes=np.random.randint(0, 60*24*30)) for _ in range(num_records)]

# 创建Pandas DataFrame
df = pd.DataFrame({
    'FullvisitorId': fullvisitor_ids,
    'Behavior': events,
    'EventTime': event_times
})

# 转换为Dask DataFrame并设置分区数
ddf = dd.from_pandas(df, npartitions=2).compute()
ddf = ddf.sort_values(['FullvisitorId', 'EventTime'])
result = ddf.groupby('FullvisitorId').apply(calculate_time_to_checkout).reset_index(drop=True)
result['mean_time_to_check'] = result['time_to_checkout'].mean()
result['hesitate'] = (result['time_to_checkout'] > result['mean_time_to_check']).astype(int)
result

  result = ddf.groupby('FullvisitorId').apply(calculate_time_to_checkout).reset_index(drop=True)


Unnamed: 0,customer_id,actions,start_time,end_time,time_to_checkout,mean_time_to_check,hesitate
0,2.0,"[click, view, view, view]",2024-01-01 19:14:00,2024-01-08 10:49:00,6 days 15:35:00,10 days 11:45:48,0
1,7.0,"[click, click, click]",2024-01-01 11:39:00,2024-01-08 16:53:00,7 days 05:14:00,10 days 11:45:48,0
2,8.0,"[view, view, view, view, view, view, click, view]",2024-01-03 09:16:00,2024-01-26 23:30:00,23 days 14:14:00,10 days 11:45:48,1
3,10.0,"[view, view]",2024-01-04 21:20:00,2024-01-14 16:58:00,9 days 19:38:00,10 days 11:45:48,0
4,12.0,"[view, view]",2024-01-04 06:55:00,2024-01-09 11:03:00,5 days 04:08:00,10 days 11:45:48,0


In [None]:
ddf[ddf['FullvisitorId']==10]

Unnamed: 0,FullvisitorId,Behavior,EventTime
59,10,view,2024-01-04 21:20:00
20,10,view,2024-01-09 22:34:00
54,10,purchase,2024-01-14 16:58:00
89,10,view,2024-01-26 10:31:00
