In [1]:
import sys
import numpy as np
from pyspark.context import SparkContext
from pyspark import SparkConf
from pyspark.sql.types import DecimalType
import pyspark.sql.types as types
import pyspark.sql.functions as functions
from pyspark.sql import Window
from pyspark import StorageLevel

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
37,application_1564465438466_0049,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


# ETL Job 参数设置

In [2]:
DF_PARTITIONS_NUM = 50
OUTPUT_CSV = 's3://0-glue-test-jyhy/runqi_sequence_features_incomplete_csv'
GLUE_DB = "0-glue-test-db0"
GLUE_TABLE = "logpool_300_users0"

columns_names = ['msgtype', 'time', 'ip', 'device_id', 'email_id', 'user_id', 'adjust_adid', 'adjust_idfa', 'adjust_gps_adid']
for i in range(1, 43):
    columns_names.append('x' + str(i))

VBox()

# 读取数据，做成DataFrame，cache下来

In [3]:
# glueContext = GlueContext(SparkContext.getOrCreate())
# datasource = glueContext.create_dynamic_frame.from_catalog(
# database=GLUE_DB,
# table_name=GLUE_TABLE)
# # 转换为dataframe
# df_all = datasource.toDF()

df_all = sc.textFile('s3://0-glue-test-jyhy/origin_data/demo3/*/*', use_unicode=True).map(lambda x: (x,)).toDF(['pool'])
# df_all = sc.textFile('s3://0-glue-test-jyhy/origin_data/users0/*/*.user', minPartitions=100, use_unicode=True).map(lambda x: (x,)).toDF(['pool'])
df_all.cache()
df_all.count()
# 拆分log日志
df_split = df_all.withColumn("s", functions.split(functions.col('pool'), "\t"))
selected_cols = [df_split['s'].getItem(i).alias(columns_names[i]) for i in range(0,51)]
df_stage0 = df_split.select(*selected_cols)\
                .withColumnRenamed('user_id', 'user')\
                .withColumn("time", functions.col('time').cast(types.LongType()))\
                .withColumn("user", functions.col('user').cast(types.IntegerType()))

# change to relevant time
df_stage0 = df_stage0.withColumn("start_time", functions.min('time').over(Window.partitionBy('user')))\
                    .withColumn('time', functions.col('time') - functions.col('start_time')).repartition('user')

df_stage0.cache()
df_stage0.count()
df_all.unpersist()
# df_stage0.show()


VBox()

DataFrame[pool: string]

# 序列特征提取相关方法

In [4]:
def sequence_partition_allow_incomplete(df_origin, cate_col, ord_col, par_col, par_start, par_end):
    df_temp = df_origin.withColumn("start_mark", functions.when(functions.col(par_col) == par_start, 1).otherwise(0))\
    .withColumn("end_mark", functions.when(functions.col(par_col) == par_end, 1).otherwise(0))\
    .withColumn("start_accu", functions.sum('start_mark').over(Window.partitionBy(cate_col).orderBy(ord_col)))\
    .withColumn("end_accu", functions.sum('end_mark').over(Window.partitionBy(cate_col).orderBy(functions.col(ord_col).desc())))\
    .drop('start_mark', 'end_mark')
    return df_temp

def sequence_partition_allow_incomplete_rmv_head_side(df_origin, cate_col, ord_col, par_col, par_start, par_end):
    df_temp = df_origin.withColumn("start_mark", functions.when(functions.col(par_col) == par_start, 1).otherwise(0))\
    .withColumn("end_mark", functions.when(functions.col(par_col) == par_end, 1).otherwise(0))\
    .withColumn("start_accu", functions.sum('start_mark').over(Window.partitionBy(cate_col).orderBy(ord_col)))\
    .withColumn("end_accu", functions.sum('end_mark').over(Window.partitionBy(cate_col).orderBy(functions.col(ord_col).desc())))\
    .drop('start_mark', 'end_mark')\
    .filter("start_accu!=0")
    return df_temp

def sequence_partition_allow_incomplete_rmv_both_sides(df_origin, cate_col, ord_col, par_col, par_start, par_end):
    df_temp = df_origin.withColumn("start_mark", functions.when(functions.col(par_col) == par_start, 1).otherwise(0))\
    .withColumn("end_mark", functions.when(functions.col(par_col) == par_end, 1).otherwise(0))\
    .withColumn("start_accu", functions.sum('start_mark').over(Window.partitionBy(cate_col).orderBy(ord_col)))\
    .withColumn("end_accu", functions.sum('end_mark').over(Window.partitionBy(cate_col).orderBy(functions.col(ord_col).desc())))\
    .drop('start_mark', 'end_mark')\
    .filter("start_accu!=0 and end_accu!=0")
    return df_temp

def sequence_partition_must_complete(df_origin, cate_col, ord_col, par_col, par_start, par_end):
    df_temp = df_origin.withColumn("start_mark", functions.when(functions.col(par_col) == par_start, 1).otherwise(0))\
    .withColumn("end_mark", functions.when(functions.col(par_col) == par_end, 1).otherwise(0))\
    .withColumn("start_accu", functions.sum('start_mark').over(Window.partitionBy(cate_col).orderBy(ord_col)))\
    .withColumn("end_accu", functions.sum('end_mark').over(Window.partitionBy(cate_col).orderBy(functions.col(ord_col).desc())))\
    .withColumn("max_start_each_end", functions.max('start_accu').over(Window.partitionBy(cate_col, 'end_accu').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))\
    .withColumn("max_end_each_start", functions.max('end_accu').over(Window.partitionBy(cate_col, 'start_accu').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))\
    .filter("start_accu!=0 and end_accu!=0 and start_accu=max_start_each_end and end_accu=max_end_each_start")\
    .drop('start_mark', 'end_mark', 'max_start_each_end', 'max_end_each_start')
    return df_temp

def subsequence_one_feature_one_column_count(df_origin, tar_col, feature, col_name_prefix):
    df_temp = df_origin.withColumn('flag', functions.when(functions.col(tar_col) == feature, 1).otherwise(0))\
            .groupBy('user', 'start_accu', 'end_accu').sum('flag').withColumnRenamed('sum(flag)','{}{}_count'.format(col_name_prefix, feature))
    return df_temp

def subsequence_multi_features_one_column_count(df_origin, tar_col, features, col_name_prefix):
    df_temp = df_origin.select('*', *[functions.when(functions.col(tar_col) == feature, 1).otherwise(0).alias('{}{}_flag'.format(col_name_prefix, feature)) for feature in features])\
            .groupBy('user', 'start_accu', 'end_accu').agg(*[functions.sum('{}{}_flag'.format(col_name_prefix, feature)).alias('{}{}_count'.format(col_name_prefix, feature)) for feature in features])
    return df_temp

def multi_col_feature_check_function(tar_cols, features, yes_label=1, no_label=0):
    return multi_col_feature_check_function_inside_recursive(tar_cols, features, yes_label).otherwise(no_label)

def multi_col_feature_check_function_inside_recursive(tar_cols, features, yes_label):
    if len(tar_cols) == 0:
        return yes_label
    else:
        return functions.when(functions.col(tar_cols[-1]) == features[-1], multi_col_feature_check_function_inside_recursive(tar_cols[:-1], features[:-1], yes_label))

def subsequence_multi_features_multi_columns_count(df_origin, tar_cols_collect, features_collect, col_name_prefix):
    df_temp = df_origin.select('*', *[multi_col_feature_check_function(tar_cols, features, yes_label=1, no_label=0).alias('{}{}_flag'.format(col_name_prefix, str(features))) for tar_cols,features in zip(tar_cols_collect, features_collect)])\
            .groupBy('user', 'start_accu', 'end_accu').agg(*[functions.sum('{}{}_flag'.format(col_name_prefix, str(features))).alias('{}{}_count'.format(col_name_prefix, str(features))) for features in features_collect])
    return df_temp

def subsequence_multi_features_multi_columns_one_col_sum_value(df_origin, tar_cols_collect, features_collect, val_col, col_name_prefix):
    df_temp = df_origin.select('*', *[multi_col_feature_check_function(tar_cols, features, yes_label=1, no_label=0).alias('{}{}_flag'.format(col_name_prefix, str(features))) for tar_cols,features in zip(tar_cols_collect, features_collect)])\
            .select('*', *[(functions.col('{}{}_flag'.format(col_name_prefix, str(features))) * functions.col(val_col)).alias('{}{}_val'.format(col_name_prefix, str(features))) for features in features_collect])\
            .groupBy('user', 'start_accu', 'end_accu').agg(*[functions.sum('{}{}_val'.format(col_name_prefix, str(features))).alias('{}{}_sum'.format(col_name_prefix, str(features))) for features in features_collect])
    return df_temp

def subsequence_time_length(df_origin, tar_col, col_name_prefix):
    df_time_length = df_origin.groupBy('user', 'start_accu', 'end_accu').agg(functions.max(tar_col).alias('time_end'), functions.min(tar_col).alias('time_start'))\
                    .withColumn('time_duration', functions.col('time_end')-functions.col('time_start'))\
                    .withColumnRenamed('time_start', '{}time_start'.format(col_name_prefix)).withColumnRenamed('time_end', '{}time_end'.format(col_name_prefix)).withColumnRenamed('time_duration', '{}time_duration'.format(col_name_prefix))
    return df_time_length



VBox()

# 提取出来的序列特征，过滤掉无效的msgtype，保存

In [5]:
# df_partitioned = sequence_partition_must_complete(df_stage0, 'user', 'time', 'msgtype', 'login', 'logout')
df_partitioned = sequence_partition_allow_incomplete(df_stage0, 'user', 'time', 'msgtype', 'login', 'logout')
# 筛选掉无效msgtype
df_partitioned = df_partitioned.filter(functions.length(functions.col('msgtype')) < 50)
df_partitioned.cache()
df_partitioned.count()

df_stage0.unpersist()

VBox()

DataFrame[msgtype: string, time: bigint, ip: string, device_id: string, email_id: string, user: int, adjust_adid: string, adjust_idfa: string, adjust_gps_adid: string, x1: string, x2: string, x3: string, x4: string, x5: string, x6: string, x7: string, x8: string, x9: string, x10: string, x11: string, x12: string, x13: string, x14: string, x15: string, x16: string, x17: string, x18: string, x19: string, x20: string, x21: string, x22: string, x23: string, x24: string, x25: string, x26: string, x27: string, x28: string, x29: string, x30: string, x31: string, x32: string, x33: string, x34: string, x35: string, x36: string, x37: string, x38: string, x39: string, x40: string, x41: string, x42: string, start_time: bigint]

In [6]:
df_join_collection = {}

VBox()

# 各阶段msgtype 各类型数量统计

In [7]:
not_included_features = ['login', 'logout']
msgtype_collection = [item.msgtype for item in df_partitioned.select('msgtype').distinct().collect()]
for item in not_included_features:
    msgtype_collection.remove(item)


df_msgtype_count = subsequence_multi_features_one_column_count(df_partitioned, 'msgtype', msgtype_collection, 'msgtype_')
df_msgtype_count.cache()
df_msgtype_count.count()
df_join_collection['msgtype_count'] = df_msgtype_count
# df_msgtype_count.show()


VBox()

# 各阶段游戏时长统计

In [8]:
df_game_time = subsequence_time_length(df_partitioned, 'time', 'game_')
df_game_time.cache()
df_game_time.count()
# df_game_time.show()
df_join_collection['game_duration'] = df_game_time

VBox()

# 各阶段money_change 类型数量统计

In [9]:
col_prefix = 'money_change_types_'
money_change_types = [item.x1 for item in df_partitioned.filter("msgtype='money_change'").select('msgtype', 'x1').distinct().collect()]
tar_cols_collect = []
features_collect = []
for i in range(len(money_change_types)):
    tar_cols_collect.append(['msgtype', 'x1'])
    features_collect.append(['money_change', money_change_types[i]])
df_money_change_types_count = subsequence_multi_features_multi_columns_count(df_partitioned, tar_cols_collect, features_collect, col_prefix)
df_money_change_types_count.cache()
df_money_change_types_count.count()
df_join_collection['money_change_types_count'] = df_money_change_types_count


VBox()

# 各阶段money_change金钱变化统计

In [10]:
col_prefix = 'money_change_types_'
money_change_types = [item.x1 for item in df_partitioned.filter("msgtype='money_change'").select('msgtype', 'x1').distinct().collect()]
tar_cols_collect = []
features_collect = []
for i in range(len(money_change_types)):
    tar_cols_collect.append(['msgtype', 'x1'])
    features_collect.append(['money_change', money_change_types[i]])

df_partitioned_temp  = df_partitioned.withColumn('x2', df_partitioned["x2"].cast(types.LongType()))
df_partitioned_temp.cache()
df_partitioned_temp.count()
df_money_change_types_sum = subsequence_multi_features_multi_columns_one_col_sum_value(df_partitioned_temp, tar_cols_collect, features_collect, 'x2', col_prefix)
df_money_change_types_sum.cache()
df_money_change_types_sum.count()
df_join_collection['money_change_types_sum'] = df_money_change_types_sum
# df_money_change_types_sum.show()
# 验证正确性
# df_partitioned_temp.filter("user=1244915 and msgtype='money_change' and x1='spin'").select('user', 'start_accu', 'end_accu', 'msgtype', 'x1', 'x2').groupBy('user', 'start_accu', 'end_accu').sum('x2').show()

df_partitioned_temp.unpersist()

VBox()

DataFrame[msgtype: string, time: bigint, ip: string, device_id: string, email_id: string, user: int, adjust_adid: string, adjust_idfa: string, adjust_gps_adid: string, x1: string, x2: bigint, x3: string, x4: string, x5: string, x6: string, x7: string, x8: string, x9: string, x10: string, x11: string, x12: string, x13: string, x14: string, x15: string, x16: string, x17: string, x18: string, x19: string, x20: string, x21: string, x22: string, x23: string, x24: string, x25: string, x26: string, x27: string, x28: string, x29: string, x30: string, x31: string, x32: string, x33: string, x34: string, x35: string, x36: string, x37: string, x38: string, x39: string, x40: string, x41: string, x42: string, start_time: bigint, start_accu: bigint, end_accu: bigint]

# 各阶段 系统奖励类型数量统计

In [11]:
col_prefix = 'sysgift_get_'
sysgift_types = [item.x1 for item in df_partitioned.filter("msgtype='sys_gift_get'").select('x1').distinct().collect()]
tar_cols_collect = []
features_collect = []
for i in range(len(sysgift_types)):
    tar_cols_collect.append(['msgtype', 'x1'])
    features_collect.append(['sys_gift_get', sysgift_types[i]])
df_sysgift_types_count = subsequence_multi_features_multi_columns_count(df_partitioned, tar_cols_collect, features_collect, col_prefix)
df_sysgift_types_count.cache()
df_sysgift_types_count.count()
df_join_collection['sysgift_types_count'] = df_sysgift_types_count

VBox()

# 各阶段胜率统计

In [12]:
# df_partitioned.filter("msgtype='spin'").show(100, truncate= False)
# x22
df_spin_wins = df_partitioned.withColumn('x22', df_partitioned["x22"].cast(types.LongType())).withColumn('spin_flag', functions.when(df_partitioned.msgtype=='spin',1).otherwise(0)).withColumn('spin_win', functions.when(functions.col('spin_flag')==1, functions.when(functions.col('x22')>0,1)).otherwise(0)).groupBy('user', 'start_accu', 'end_accu').agg(functions.sum('spin_flag').alias('spin_all'), functions.sum('spin_win').alias('spin_win')).withColumn('spin_win_rate', functions.col('spin_win')/functions.col('spin_all'))
df_spin_wins.cache()
df_spin_wins.count()
df_join_collection['spin_win'] = df_spin_wins
# df_spin_wins.show()

VBox()

# join 在一起

In [13]:
base_table = df_partitioned.select('user','start_accu', 'end_accu').distinct()
base_table.cache()
# base_table.show()
df_features_all = base_table
for k,v in df_join_collection.items():
    df_features_all = df_features_all.join(v, ['user','start_accu', 'end_accu'], 'left')
df_features_all.cache()
df_features_all.show()

VBox()

+-------+----------+--------+-------------+---------------+------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+---------------------------------------------------+-------------------------------------------------------------------+--------------------------------------------------------+----------------------------------------------------------------+--------------------------------------------------------+--------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+------------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------+------------------------------------------------------------+--

# 写入csv

In [14]:
df_features_write = df_features_all.coalesce(1)
df_features_write.write.csv(OUTPUT_CSV, header=True)

VBox()

In [7]:
df_stage0.filter(functions.col('msgtype') == 'spin').show()

VBox()

+-------+-------+---+-------------------+--------+-------+--------------------+-----------+--------------------+---------+----+-----+---+-------+---+------+--------+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+---------+---+---------+--------------------+---+---+---+-----+-----+---+---+---+---+---+---+---+---+---+---+----+-------------+
|msgtype|   time| ip|          device_id|email_id|   user|         adjust_adid|adjust_idfa|     adjust_gps_adid|       x1|  x2|   x3| x4|     x5| x6|    x7|      x8| x9|x10|x11|x12|x13|x14|x15|x16|x17|x18|x19|x20|x21|     x22|      x23|x24|      x25|                 x26|x27|x28|x29|  x30|  x31|x32|x33|x34|x35|x36|x37|x38|x39|x40|x41| x42|   start_time|
+-------+-------+---+-------------------+--------+-------+--------------------+-----------+--------------------+---------+----+-----+---+-------+---+------+--------+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+---------+---+---------+--------------------+---+---+---+-----+-