# 生成用户日志

本代码的目的是把用户分组，并在每个组中统计用户的行为日志，以方便后续的并行化处理。

This code aims to group users into serveral groups, then statistic the user behaivors into each group. This process can simplify the operation in multi-processing  

In [1]:
import multiprocessing as mp
import time
import pandas as pd
import numpy as np



def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.        
    """
    start_mem = df.memory_usage().sum() 
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() 
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
    
    return df

def generate_logs_for_each_group(matrix, q):
    user_log = dict()
    for row in matrix:
        user_log.setdefault(row[0], [])
        user_log[row[0]].append(row[1])
    print('This batc is finished')
    q.put(user_log)

In [2]:
CPU_NUMS = 8

In [4]:
# round2 train的路径
path = '../ECommAI_EUIR_round2_train_20190816/'

In [5]:
data = reduce_mem_usage(pd.read_csv(path+'user_behavior.csv', header=None))
user = pd.read_csv(path+'user.csv', header=None)
item = pd.read_csv(path+'item.csv', header=None)

data['day'] = data[3] // 86400
data['hour'] = data[3] // 3600 % 24

data = data.drop(3, axis=1)

data.columns = ['userID','itemID','behavoir','day','hour']
user.columns = ['userID', 'sex', 'age', 'ability']
item.columns = ['itemID', 'category', 'shop', 'band']

data = data.drop_duplicates(['userID','itemID'],keep="last")
data = data.sort_values(['day','hour'], ascending=True).reset_index(drop=True)

users = list(set(user['userID']))

user_groups = [users[i: i + len(users) // CPU_NUMS] for i in range(0, len(users), len(users) // CPU_NUMS)]

q = mp.Queue()
for groupID in range(len(user_groups)):
    matrix = data[data['userID'].isin(user_groups[groupID])][['userID','itemID']].values
    task = mp.Process(target=generate_logs_for_each_group, args=(matrix, q, ))
    task.start()
    
start_time = time.time()
print('Waiting for the son processing')
while q.qsize() != len(user_groups):
    pass
end_time = time.time()
print("Over, the time cost is:"  + str(end_time - start_time))

Memory usage of dataframe is 2575214592.00 MB
Memory usage after optimization is: 1046181196.00 MB
Decreased by 59.4%
This batc is finished
This batc is finished
This batc is finished
This batc is finished
This batc is finished
This batc is finished
This batc is finished
This batc is finished
Waiting for the son processing
This batc is finished
Over, the time cost is:6.490148067474365


In [6]:
for i in range(len(user_groups)):
    temp = q.get()
    f = open('full_logs/userlogs_group' + str(i) + '.txt','w')
    f.write(str(temp))
    f.close()
