In [26]:
# 每一条访问记录，作为一条训练数据。
# Y = transactionRevenue/1000000




In [27]:
import pandas as pd
import numpy as np
import json
from sklearn.preprocessing import LabelEncoder
from tqdm._tqdm_notebook import tqdm_notebook as bar;
bar().pandas("track progress")

from joblib import Parallel, delayed
from multiprocessing import Pool, cpu_count
import time

label = LabelEncoder()

dir='/Users/xinwang/ai/dataset/kaggle/GStore/'
train_file = 'train.csv'
test_file = 'test.csv'

train = pd.read_csv(dir + train_file, low_memory=False)
test = pd.read_csv(dir + test_file, low_memory=False)
# train.set_index('sessionId')
# test.set_index('sessionId')

print('train.shape',train.shape)
print('test.shape',test.shape)
print()

train.drop_duplicates(subset='sessionId', keep='last', inplace=True)
test.drop_duplicates(subset='sessionId', keep='last', inplace=True)

cate_features = []
basic_num_features = []
advance_num_features = []

desc_features = ['fullVisitorId','sessionId','visitId','visitStartTime']

train['fullVisitorId'] = train['fullVisitorId'].astype(str)
test['fullVisitorId'] = test['fullVisitorId'].astype(str)

train['channelGrouping'] = label.fit_transform(train['channelGrouping'])
test['channelGrouping'] = label.fit_transform(test['channelGrouping'])

cate_features.append('channelGrouping')

def label_transform(df, col_list):
    for col in col_list:
        df[col] = label.fit_transform(df[col])

    return df

####################################
date_cate_features = ['date','year']
date_num_features = ['month','day','week','weekofyear','dayofweek','quarter','month_start','month_end']

def process_datetime(df):
    df['datetime'] = pd.to_datetime(df['date'], format='%Y%m%d',errors='ignore')
    df['year'] = df['datetime'].dt.year
    df['month'] = df['datetime'].dt.month
    df['month'] =df['month'].astype(int)
    
    df['day'] = df['datetime'].dt.day
    df['day'] = df['day'].astype(int)

    df['week'] = df['datetime'].dt.week
    df['week'] = df['week'].astype(int)
    
    df['weekofyear'] = df['datetime'].dt.weekofyear
    df['weekofyear'] = df['weekofyear'].astype(int)
    
    df['dayofweek'] = df['datetime'].dt.dayofweek
    df['dayofweek'] = df['dayofweek'].astype(int)
    
    df['quarter'] = df['datetime'].dt.quarter
    df['quarter'] = df['quarter'].astype(int)
    
    df['month_start'] = df['day'].apply(lambda x: 1 if x<=7 else 0).astype(int)
    df['month_end'] = df['day'].apply(lambda x: 1 if x>=25 else 0).astype(int)

    df = label_transform(df, date_cate_features)
    
    return df

train = process_datetime(train)
test = process_datetime(test)
cate_features += date_cate_features
basic_num_features += date_num_features

print('process_datetime done')


################device####################
device_features = ['browser','operatingSystem','isMobile','deviceCategory']

def process_device(df):
    df['browser'] = df['device'].apply(lambda x: json.loads(x)['browser'])
    df['operatingSystem'] = df['device'].apply(lambda x: json.loads(x)['operatingSystem'])
    df['isMobile'] = df['device'].apply(lambda x: json.loads(x)['isMobile'])
    df['deviceCategory'] = df['device'].apply(lambda x: json.loads(x)['deviceCategory'])

    df = label_transform(df, device_features)

    return df

train = process_device(train)
test = process_device(test)
cate_features += device_features
print('process_device done')


###############geoNetwork#####################
geo_features = ['continent','subContinent','country','region','metro','city','networkDomain']

def process_geo(df):
    df['continent'] = df['geoNetwork'].apply(lambda x: json.loads(x)['continent'])
    df['subContinent'] = df['geoNetwork'].apply(lambda x: json.loads(x)['subContinent'])
    df['country'] = df['geoNetwork'].apply(lambda x: json.loads(x)['country'])
    df['region'] = df['geoNetwork'].apply(lambda x: json.loads(x)['region'])
    df['metro'] = df['geoNetwork'].apply(lambda x: json.loads(x)['metro'])
    df['city'] = df['geoNetwork'].apply(lambda x: json.loads(x)['city'])
    df['networkDomain'] = df['geoNetwork'].apply(lambda x: json.loads(x)['networkDomain'])

    df = label_transform(df, geo_features)

    return df

train = process_geo(train)
test = process_geo(test)
cate_features += geo_features
print('process_geo done')


################totals####################
view_features = ['hits','pageviews','newVisits','bounces','visitNumber','transactionRevenue','buy_or_not']

def process_totals(df):
    df['hits'] = df['totals'].apply(lambda x: json.loads(x)['hits']).astype(int)
    df['pageviews'] = df['totals'].apply(lambda x: json.loads(x)['pageviews'] if x.find('pageviews')>=0 else 0).astype(int)
    df['bounces'] = df['totals'].apply(lambda x: json.loads(x)['bounces'] if x.find('bounces')>=0 else 0).astype(int)
    df['newVisits'] = df['totals'].apply(lambda x: json.loads(x)['newVisits'] if x.find('newVisits')>=0 else 0).astype(int)
    df['transactionRevenue'] = df['totals'].apply(lambda x: json.loads(x)['transactionRevenue'] if x.find('transactionRevenue')>=0 else 0).astype(int)
    df['buy_or_not'] = df['transactionRevenue'].apply(lambda x: 1 if x>0 else 0).astype(int)
    df['visitNumber'] = df['visitNumber'].astype(int)
    
    return df

train = process_totals(train)
test = process_totals(test)
basic_num_features += view_features
print('process_totals done')


################totals####################
last_time_features = ['last_seconds','last_minutes']

def process_last_time(df):
    df['last_seconds'] = df['visitStartTime']-df['visitId']
    df['last_minutes'] = (df['visitStartTime']-df['visitId'])/60
    df['last_minutes'] = df['last_minutes'].astype(np.int64)

    return df

train = process_last_time(train)
test = process_last_time(test)
basic_num_features += last_time_features
print('process_last_time done')


def parse_adwordsClickInfo_field(x, field):
    jo = json.loads(x)
    
    if x.find('adwordsClickInfo')>=0:
        adwordsClickInfo = jo['adwordsClickInfo']
        
        if str(adwordsClickInfo).find(field)>=0:
            return adwordsClickInfo[field]

    return 0

def parse_adwordsClickInfo_page(x):
    return parse_adwordsClickInfo_field(x, 'page')

def parse_adwordsClickInfo_slot(x):
    return parse_adwordsClickInfo_field(x, 'slot')

def parse_adwordsClickInfo_gclId(x):
    return parse_adwordsClickInfo_field(x, 'gclId')

def parse_adwordsClickInfo_adNetworkType(x):
    return parse_adwordsClickInfo_field(x, 'adNetworkType')

def parse_adwordsClickInfo_isVideoAd(x):
    return parse_adwordsClickInfo_field(x, 'isVideoAd')

traffic_features = ['campaign','source','medium','keyword','adwordsClickInfo_gclId_prefix','adwordsClickInfo_slot',
                    'adwordsClickInfo_gclId','adwordsClickInfo_adNetworkType']

def process_traffic(df):
    df['campaign'] = df['trafficSource'].progress_apply(lambda x: json.loads(x)['campaign']).astype(str)
    # need to merge nearly same record
    df['source'] = df['trafficSource'].apply(lambda x: json.loads(x)['source']).astype(str)
    df['medium'] = df['trafficSource'].apply(lambda x: json.loads(x)['medium']).astype(str)
    # need to merge some keywords
    df['keyword'] = df['trafficSource'].progress_apply(lambda x: json.loads(x)['keyword'] if x.find('keyword')>=0 else 0).astype(str)

    df['adwordsClickInfo_page'] = df['trafficSource'].apply(parse_adwordsClickInfo_page).astype(int)
    df['adwordsClickInfo_slot'] = df['trafficSource'].apply(parse_adwordsClickInfo_slot).astype(str)
    df['adwordsClickInfo_gclId'] = df['trafficSource'].apply(parse_adwordsClickInfo_gclId).astype(str)
    df['adwordsClickInfo_gclId_prefix'] = df['adwordsClickInfo_gclId'].apply(lambda x: x.split('_')[0] if type(x)!=int and x.find('_')>=0 else 0).astype(str)
    df['adwordsClickInfo_adNetworkType'] = df['trafficSource'].apply(parse_adwordsClickInfo_adNetworkType).astype(str)

    df = label_transform(df, traffic_features)
    
    return df

train = process_traffic(train)
test = process_traffic(test)

cate_features += traffic_features
basic_num_features.append('adwordsClickInfo_page')
print('process_traffic done')


removed_columns = ['device','geoNetwork','socialEngagementType','totals','trafficSource']
train.drop(removed_columns, axis=1, inplace=True)
test.drop(removed_columns, axis=1, inplace=True)


print('train.shape',train.shape)
print('test.shape',test.shape)
print()



def print_mem(df):
    print('-'*80)
    usage_mb = df.memory_usage(deep=True) / 1024 ** 2
    print(usage_mb)
    print('-'*80)

HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))


train.shape (903653, 12)
test.shape (804684, 12)

process_datetime done
process_device done
process_geo done
process_totals done
process_last_time done


HBox(children=(IntProgress(value=0, max=902755), HTML(value='')))




HBox(children=(IntProgress(value=0, max=902755), HTML(value='')))




HBox(children=(IntProgress(value=0, max=803863), HTML(value='')))




HBox(children=(IntProgress(value=0, max=803863), HTML(value='')))


process_traffic done
train.shape (902755, 45)
test.shape (803863, 45)



In [28]:
day_seconds = 24 * 60* 60
month_seconds = 30 * day_seconds
debug=False

transaction_features = ['days_from_last_visted','months_from_last_visted','last_visit_buy',
                        'countOfRevenuePerVisitNumRate','max_visit_number','sum_previous_pageviews',
                        'sum_previous_hits','transaction_count','buy_times']
advance_num_features += transaction_features


#################### Parallel run  #################### 
import tqdm

#CreateFeature the seconds from last visited
def create_feature_times_from_last_visited(group_df):
    days_from_last_array = []
    months_from_last_array = []
    
    visitId_array = group_df['visitId'].values
    last = visitId_array[0]
    for item in group_df['visitId'].values:
        days = int((item-last)/day_seconds)
        days_from_last_array.append(days)
        
        months = int((item-last)/month_seconds)
        months_from_last_array.append(months)
        
        last = item
        
        
    group_df['days_from_last_visted'] = days_from_last_array
    group_df['months_from_last_visted'] = months_from_last_array
    
    return group_df
    
    
    
#CreateFeature does last visited bought?
def create_feature_last_visited_bought(group_df):
    last_deal_array = []
    revenue_array = group_df['transactionRevenue'].values

    last_revenue = revenue_array[0]
    for i in range(len(revenue_array)):
        if i==0:
            last_deal_array.append(0)
        else:
            if revenue_array[i-1]>0:
                last_deal_array.append(1)
            else:
                last_deal_array.append(0)

    group_df['last_visit_buy'] = last_deal_array

    return group_df
    
# CreateFeature 
# count(transactionRevenue)/MAX(visitNumber)
def create_feature_countsOfvist_countsOfDeals(group_df):
    max_visit_num = np.max(group_df['visitNumber'])
    count_of_revenue = sum(group_df['transactionRevenue']>0)
    
    rate = count_of_revenue/max_visit_num
    
    group_df['countOfRevenuePerVisitNumRate'] = [rate]*group_df.shape[0]
    
    return group_df

def create_feature_max_visit_number(group_df):
    max_visit_num = np.max(group_df['visitNumber'])
    
    group_df['max_visit_number'] = [max_visit_num]*group_df.shape[0]
    
    return group_df

def create_feature_previous_sum_pageviews(group_df):
    sum_previous_pageviews = []
    sum = 0
    for item in group_df['pageviews'].values:
        sum += item
        sum_previous_pageviews.append(sum)
            
    group_df['sum_previous_pageviews'] = sum_previous_pageviews
    
    return group_df
    
def create_feature_previous_sum_hits(group_df):
    sum_previous_hits = []
    sum = 0
    for item in group_df['hits'].values:
        sum += item
        sum_previous_hits.append(sum)
            
    group_df['sum_previous_hits'] = sum_previous_hits
    
    return group_df
    
def create_feature_transaction_count(group_df):
    count = group_df.shape[0]
    
    group_df['transaction_count'] = [count] * group_df.shape[0]
    
    return group_df

def create_feature_buy_times(group_df):
    buy_times = sum(group_df['transactionRevenue']>0)
    
    group_df['buy_times'] = [buy_times] * group_df.shape[0]
    
    return group_df

target = 'total_revenue'

def create_feature_revenue(group_df):
    sum_revenue = np.log(sum(group_df['transactionRevenue']) + 1)
    
    if debug is True:
        print('method\n',group_df.shape,group_df['transactionRevenue'].values,
              sum(group_df['transactionRevenue']),sum_revenue)
    
    group_df[target] = [sum_revenue] * group_df.shape[0]
    
    return group_df


In [None]:
import multiprocessing

def process_one_group(group_df):
    group_df = create_feature_times_from_last_visited(group_df)
    group_df = create_feature_last_visited_bought(group_df)
    group_df = create_feature_countsOfvist_countsOfDeals(group_df)
    group_df = create_feature_max_visit_number(group_df)
    
#   pageviews
    group_df = create_feature_previous_sum_pageviews(group_df)
#   hits
    group_df = create_feature_previous_sum_hits(group_df)
#   transcation count
    group_df = create_feature_transaction_count(group_df)
#   buy times
    group_df = create_feature_buy_times(group_df)
#     sum of user revenue
    group_df = create_feature_revenue(group_df)
        
    return group_df
    

def parallel_batch_process_groups(grouped):
    print('Run parallel_batch_process_groups')
    
    batch = []
    first_batch=True
    result_df = pd.DataFrame()

    counter = 0
    total = len(grouped.size())
    for name, group_df in grouped:
        group_df_copy = group_df.copy()
        
        fullVisitorId = group_df_copy['fullVisitorId'].values[0]

        group_df_copy.sort_values(by='visitNumber', inplace=True)
        group_df_copy = process_one_group(group_df_copy)

        batch.append(group_df_copy)
        counter += 1
        del group_df_copy
        
        if len(batch)>=5000:
            print('progress report', multiprocessing.current_process().name,100.0*(counter/total))
            if first_batch is True:
                result_df = pd.concat(batch, axis=0)
                del batch
                batch = []
                first_batch = False
            else:
                batch_df = pd.concat(batch, axis=0)
                result_df = pd.concat([result_df, batch_df], axis=0)
                del batch
                del batch_df
                batch = []
        
    batch_df = pd.concat(batch, axis=0)
    
    if result_df.shape[0]>0:
        result_df = pd.concat([result_df, batch_df], axis=0)
    else:
        result_df = batch_df
    del batch
    del batch_df
            
    return result_df
    

def applyParallelInBatch(df, by_field, func):
    df_copy = df.copy()
    
    batch_size=8
    
    df_copy['hash_index'] = df_copy[by_field].apply(lambda x: hash(x) % batch_size)
    
    pool = multiprocessing.Pool(processes=batch_size)
    
    batch_result = []
    for i in range(batch_size):
        batch_df = df_copy[df_copy['hash_index'] == i]
        grouped = batch_df.groupby(by_field)
        
        batch_result.append(pool.apply_async(func, args=(grouped,)))
        print('apply async job to pool',batch_df.shape)
        
    print()
    print()
    pool.close()
    pool.join()
    
    result_df = pd.DataFrame(columns=df.columns)
    for i in batch_result:
        part_df = i.get()
        result_df = pd.concat([result_df,part_df], axis=0)
        print('got result result_df',result_df.shape)
            
    return result_df


def process_transactions(df):
    print('Parallel run applyParallelInBatch')
    input_columns = ['fullVisitorId','visitId','visitNumber','transactionRevenue','pageviews','hits']
    transaction_df = applyParallelInBatch(df[['sessionId'] + input_columns ],
                                          'fullVisitorId',
                                          parallel_batch_process_groups)
    transaction_df.drop(input_columns, axis=1, inplace=True)

    print('should have', len(df['fullVisitorId'].unique()))
    print('transaction_df.shape',transaction_df.shape)

    df = pd.merge(df,transaction_df, on='sessionId', how='left')
    print_mem(df)

    del transaction_df
    
    return df
    
train = process_transactions(train)
test = process_transactions(test)

print('process_transactions done')
print('train.shape',train.shape)
print('test.shape',test.shape)


Parallel run applyParallelInBatch
Run parallel_batch_process_groups
apply async job to pool (112368, 8)
apply async job to pool (112311, 8)
apply async job to pool (112944, 8)
apply async job to pool (112907, 8)
apply async job to pool (112889, 8)
Run parallel_batch_process_groups
Run parallel_batch_process_groups
apply async job to pool (113748, 8)
apply async job to pool (112785, 8)
apply async job to pool (112803, 8)


Run parallel_batch_process_groups
Run parallel_batch_process_groups
Run parallel_batch_process_groups
Run parallel_batch_process_groups
Run parallel_batch_process_groups
progress report ForkPoolWorker-42 5.612113185098717
progress report ForkPoolWorker-41 5.58890267484882
progress report ForkPoolWorker-43 5.576125262077887
progress report ForkPoolWorker-45 5.593404257699321
progress report ForkPoolWorker-44 5.619998201600575
progress report ForkPoolWorker-46 5.584096493187403
progress report ForkPoolWorker-47 5.608399138549893
progress report ForkPoolWorker-48 5.62480

In [None]:
train[['fullVisitorId','transaction_count','transactionRevenue']+advance_num_features].loc[train['transaction_count']>1].head(10)

In [None]:
train.loc[train['fullVisitorId']=='6664733704830724714'].T.head(100)

In [None]:
train[['total_revenue','countOfRevenuePerVisitNumRate']].loc[train['countOfRevenuePerVisitNumRate']>0]

In [None]:
# agg numeric features
def agg_numeric_feature(df):
    for col in basic_num_features:
        temp_df = df[[col,'fullVisitorId']].groupby('fullVisitorId')[col].agg(['min','max','mean','median'])
        temp_df['fullVisitorId'] = temp_df.index
        temp_df.rename(columns={
            'min': col +'_' +'min',
            'max': col +'_' +'max',
            'mean': col +'_' +'mean',
            'median': col +'_' +'median'
        }, inplace=True)

        df = pd.merge(df, temp_df, on='fullVisitorId')
        del temp_df

        print(col,df.shape)

    return df

train = agg_numeric_feature(train)
test = agg_numeric_feature(test)

print('agg_numeric_feature done')
print('train.shape',train.shape)
print('test.shape',test.shape)


In [None]:
# agg cate features
from pandas import Series

print('train.shape',train.shape)
print('test.shape',test.shape)

def applyParallel(dfGrouped, func, total):
    with Pool(cpu_count()) as p:
        ret_list = list(tqdm.tqdm(
            p.imap(func, [group for name, group in dfGrouped]), 
            total=total))

    df = pd.concat(ret_list, axis=1)
    del ret_list
    
    return df.T

def agg_cate_group_func(group_df):
    new_row = Series()
    new_row['fullVisitorId'] = group_df['fullVisitorId'].values[0]

    for col in cate_features:
        value_counts_series = group_df[col].value_counts()
        
        counts_array = value_counts_series.values
        length = len(counts_array)
        if length == 1:
            new_row[col + '_top_1'] = counts_array[0]
            new_row[col + '_top_2'] = counts_array[0] 
            new_row[col + '_top_3'] = counts_array[0]
        elif length == 2:
            new_row[col + '_top_1'] = counts_array[0]
            new_row[col + '_top_2'] = counts_array[1] 
            new_row[col + '_top_3'] = counts_array[1]
        else:
            new_row[col + '_top_1'] = counts_array[0]
            new_row[col + '_top_2'] = counts_array[1] 
            new_row[col + '_top_3'] = counts_array[2]
            
        del value_counts_series
        del counts_array
    return new_row


def expand_cate_features(df):
    d_rows = df[df['fullVisitorId'].duplicated(keep=False)]
    revisted_df = df.loc[d_rows.index]
    
    total = len(revisted_df['fullVisitorId'].unique())
    print('revisted_df',total,revisted_df.shape)
    
    cate_group_data = revisted_df[cate_features + ['fullVisitorId']].groupby('fullVisitorId')

    revisted_cates_top_3_df = applyParallel(cate_group_data, agg_cate_group_func, total)
    print('got revisted_cates_top_3_df', revisted_cates_top_3_df.shape)

    df = pd.merge(df, revisted_cates_top_3_df, on='fullVisitorId', how='left')
    print('df merge cate_top_3_features_df done')
    
    del cate_group_data
    del revisted_cates_top_3_df

    return df
    
train = expand_cate_features(train)
test = expand_cate_features(test)

print('process expand_cate_features done')
print('train.shape',train.shape)
print('test.shape',test.shape)


###################cate_features_fillna########################## 
def cate_features_fillna(df):
    for col in cate_features:
        top_1_null_index = df[df[col + '_top_1'].isnull()].index
        df[col + '_top_1'].fillna(
            df[col].loc[top_1_null_index], inplace=True)
        
        top_2_null_index = df[df[col + '_top_2'].isnull()].index
        df[col + '_top_2'].fillna(
            df[col].loc[top_2_null_index], inplace=True)
        
        top_3_null_index = df[df[col + '_top_3'].isnull()].index
        df[col + '_top_3'].fillna(
            df[col].loc[top_3_null_index], inplace=True)
        print('cate_features_fillna', col)
        
        
    return df
        
        
train = cate_features_fillna(train)
test = cate_features_fillna(test)
print('process cate_features_fillna done')


In [None]:
print('train.shape',train.shape)
print('test.shape',test.shape)

new_cate_features = [] 
for col in cate_features:
    new_cate_features.append(col + '_top_1')
    new_cate_features.append(col + '_top_2')
    new_cate_features.append(col + '_top_3')
    
cate_features += new_cate_features


new_numeric_features = []
for col in basic_num_features:
    new_numeric_features.append(col+'_' +'min')
    new_numeric_features.append(col+'_' +'max')
    new_numeric_features.append(col+'_' +'mean')
    new_numeric_features.append(col+'_' +'median')
    

print()
print('cate_features', len(cate_features), cate_features)
print('basic_num_features', len(basic_num_features), basic_num_features)

In [None]:
basic_num_features += advance_num_features

# basic_num_features += new_numeric_features

# new_numeric_features
# basic_num_features = list(set(basic_num_features)-set(new_numeric_features))

print(basic_num_features)

In [None]:
train[basic_num_features].T[0:100]

In [None]:
from keras.layers import Input, Dense, concatenate, Dropout, Embedding, Flatten
from keras.models import Model
from keras import optimizers


def get_model():
    input_cate = Input((len(cate_features),))
    input_numeric = Input((len(basic_num_features),))
    
    x_cate = Embedding(50000, 10)(input_cate)
    x_cate = Flatten()(x_cate)
    x_cate = Dropout(0.2)(x_cate)
    x_cate = Dense(500, activation='relu')(x_cate)
    
    
    x_numeric = Dense(500, activation='relu')(input_numeric)
    x_numeric = Dropout(0.2)(x_numeric)
    
    x = concatenate([x_cate,x_numeric])
    
    x = Dense(100, activation='relu')(x)
    x = Dropout(0.2)(x)
    
    output = Dense(1, kernel_initializer='normal')(x)
    
    model = Model(inputs=[input_cate,input_numeric], outputs=output)
    model.compile(optimizer=optimizers.Adam(), loss='mean_squared_error', metrics=['mse'])
    
    return model
    
    
model = get_model()
print(model.summary())
model.fit([train[cate_features],train[basic_num_features]], train[target], validation_split=0.2, 
          epochs=3, batch_size=10)
print('model.fit done')


In [None]:
print('test.shape',test.shape)
predict_test = model.predict([test[cate_features],test[basic_num_features]],verbose=1)

test['PredictedLogRevenue'] = predict_test

predict_df = test[['PredictedLogRevenue','fullVisitorId']].groupby('fullVisitorId').agg('mean')
predict_df['fullVisitorId'] = predict_df.index


predict_df[['fullVisitorId','PredictedLogRevenue']].to_csv('GStore_keras_single_row_model.csv', index=False)
print(predict_df.shape)

predict_df.head()

In [None]:
predict_df.loc[predict_df['PredictedLogRevenue']>1]