In [1]:
"""
特征工程Demo -- 单条事件的特征

数据说明：这是一份人造的交易数据，每一条数据是一张银行卡的一笔刷卡交易

字段说明：
Agn_Org_Cnr_Cd：交易所属国家的代码
Clr_Amt：交易金额（美元）
Exg_Plf_Sys_Srl_Nbr：交易流水号，每一笔交易的唯一标识
Main_EAC_Id：银行卡号，已脱敏
Mch_Id：商户编号
Mch_Typ_Cd：商户类型编码
Rcd_Dt：交易日期
Rcd_Tm：交易时间
Trx_Typ_Cd：交易类型，CWD表示取现，PER表示POS机刷卡

"""

'\n\xe7\x89\xb9\xe5\xbe\x81\xe5\xb7\xa5\xe7\xa8\x8bDemo -- \xe5\x8d\x95\xe6\x9d\xa1\xe4\xba\x8b\xe4\xbb\xb6\xe7\x9a\x84\xe7\x89\xb9\xe5\xbe\x81\n\n\xe6\x95\xb0\xe6\x8d\xae\xe8\xaf\xb4\xe6\x98\x8e\xef\xbc\x9a\xe8\xbf\x99\xe6\x98\xaf\xe4\xb8\x80\xe4\xbb\xbd\xe4\xba\xba\xe9\x80\xa0\xe7\x9a\x84\xe4\xba\xa4\xe6\x98\x93\xe6\x95\xb0\xe6\x8d\xae\xef\xbc\x8c\xe6\xaf\x8f\xe4\xb8\x80\xe6\x9d\xa1\xe6\x95\xb0\xe6\x8d\xae\xe6\x98\xaf\xe4\xb8\x80\xe5\xbc\xa0\xe9\x93\xb6\xe8\xa1\x8c\xe5\x8d\xa1\xe7\x9a\x84\xe4\xb8\x80\xe7\xac\x94\xe5\x88\xb7\xe5\x8d\xa1\xe4\xba\xa4\xe6\x98\x93\n\n\xe5\xad\x97\xe6\xae\xb5\xe8\xaf\xb4\xe6\x98\x8e\xef\xbc\x9a\nAgn_Org_Cnr_Cd\xef\xbc\x9a\xe4\xba\xa4\xe6\x98\x93\xe6\x89\x80\xe5\xb1\x9e\xe5\x9b\xbd\xe5\xae\xb6\xe7\x9a\x84\xe4\xbb\xa3\xe7\xa0\x81\nClr_Amt\xef\xbc\x9a\xe4\xba\xa4\xe6\x98\x93\xe9\x87\x91\xe9\xa2\x9d\xef\xbc\x88\xe7\xbe\x8e\xe5\x85\x83\xef\xbc\x89\nExg_Plf_Sys_Srl_Nbr\xef\xbc\x9a\xe4\xba\xa4\xe6\x98\x93\xe6\xb5\x81\xe6\xb0\xb4\xe5\x8f\xb7\xef\xbc\x8c\xe6\xaf\x8

In [2]:
# 拓宽notebook
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [52]:
import json
import time
import datetime
import numpy as np
import math
from scipy.stats import describe

import pandas as pd
pd.set_option('display.max_rows',None)
pd.set_option('display.max_columns', None)

In [88]:
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark import SparkConf, StorageLevel
from pyspark.sql.types import *
from pyspark.sql import functions

conf = SparkConf().setMaster("yarn-client").setAppName("feature single")  # 集群模式
# conf = SparkConf().setMaster("local[*]").setAppName("feature single") # local模式
conf.set("spark.executor.instances", 10)
conf.set("spark.executor.memory", "5g")
conf.set("spark.executor.cores","1")
conf.set("spark.driver.memory", "5g")

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

In [5]:
# 读数据
df = spark.read.parquet("/data/fresh_train/df_feature_single_event")
df.count()

13851968

In [12]:
df.dtypes

[('Agn_Org_Cnr_Cd', 'string'),
 ('Clr_Amt', 'string'),
 ('Exg_Plf_Sys_Srl_Nbr', 'string'),
 ('Main_EAC_Id', 'string'),
 ('Mch_Id', 'string'),
 ('Mch_Typ_Cd', 'string'),
 ('Rcd_Dt', 'string'),
 ('Rcd_Tm', 'string'),
 ('Trx_Typ_Cd', 'string'),
 ('timestamp', 'string')]

In [7]:
# 数据总量
df.count()

13851968

In [8]:
# check 流水号是否有重复
df.select("Exg_Plf_Sys_Srl_Nbr").drop_duplicates().count()

13851968

In [9]:
# Main_EAC_Id 数量
df.select("Main_EAC_Id").drop_duplicates().count()

4687513

In [10]:
# 预览几条数据
df.limit(5).toPandas()

Unnamed: 0,Agn_Org_Cnr_Cd,Clr_Amt,Exg_Plf_Sys_Srl_Nbr,Main_EAC_Id,Mch_Id,Mch_Typ_Cd,Rcd_Dt,Rcd_Tm,Trx_Typ_Cd
0,734,8456.89,49180,O6y2ExaDiS3zcTt,1883638063,7720,2017-01-02,04:23:28,CWD
1,730,7969.15,49230,3K6OPAAK7GBPDOz,9908690440,5161,2017-01-02,04:25:09,PER
2,815,6407.78,49280,n2YXilX5Te7CiDU,5784823589,8521,2017-01-02,04:26:45,PER
3,770,5062.58,49330,0Rc6NTM4zMIKvJD,8445948935,5253,2017-01-02,04:28:20,PER
4,595,5172.16,49380,hIQWBnm9K7Xyqfr,5237054012,4373,2017-01-02,04:30:03,PER


In [11]:
# 新增一列：年月，并统计这一列的分布
df = df.withColumn("timestamp", functions.concat_ws(" ", "Rcd_Dt", "Rcd_Tm"))
df.show(5)

+--------------+-------+-------------------+---------------+----------+----------+----------+--------+----------+-------------------+
|Agn_Org_Cnr_Cd|Clr_Amt|Exg_Plf_Sys_Srl_Nbr|    Main_EAC_Id|    Mch_Id|Mch_Typ_Cd|    Rcd_Dt|  Rcd_Tm|Trx_Typ_Cd|          timestamp|
+--------------+-------+-------------------+---------------+----------+----------+----------+--------+----------+-------------------+
|           734|8456.89|    000000000049180|O6y2ExaDiS3zcTt|1883638063|      7720|2017-01-02|04:23:28|       CWD|2017-01-02 04:23:28|
|           730|7969.15|    000000000049230|3K6OPAAK7GBPDOz|9908690440|      5161|2017-01-02|04:25:09|       PER|2017-01-02 04:25:09|
|           815|6407.78|    000000000049280|n2YXilX5Te7CiDU|5784823589|      8521|2017-01-02|04:26:45|       PER|2017-01-02 04:26:45|
|           770|5062.58|    000000000049330|0Rc6NTM4zMIKvJD|8445948935|      5253|2017-01-02|04:28:20|       PER|2017-01-02 04:28:20|
|           595|5172.16|    000000000049380|hIQWBnm9K7Xyqfr|52

### 任务1：drop掉Rcd_Dt Rcd_Tm, 把Clr_Amt字段改为double型

In [15]:
df = df.drop("Rcd_Dt", "Rcd_Tm")
df.show(5)

+--------------+-------+-------------------+---------------+----------+----------+----------+-------------------+
|Agn_Org_Cnr_Cd|Clr_Amt|Exg_Plf_Sys_Srl_Nbr|    Main_EAC_Id|    Mch_Id|Mch_Typ_Cd|Trx_Typ_Cd|          timestamp|
+--------------+-------+-------------------+---------------+----------+----------+----------+-------------------+
|           734|8456.89|    000000000049180|O6y2ExaDiS3zcTt|1883638063|      7720|       CWD|2017-01-02 04:23:28|
|           730|7969.15|    000000000049230|3K6OPAAK7GBPDOz|9908690440|      5161|       PER|2017-01-02 04:25:09|
|           815|6407.78|    000000000049280|n2YXilX5Te7CiDU|5784823589|      8521|       PER|2017-01-02 04:26:45|
|           770|5062.58|    000000000049330|0Rc6NTM4zMIKvJD|8445948935|      5253|       PER|2017-01-02 04:28:20|
|           595|5172.16|    000000000049380|hIQWBnm9K7Xyqfr|5237054012|      4373|       PER|2017-01-02 04:30:03|
+--------------+-------+-------------------+---------------+----------+----------+------

In [16]:
df = df.withColumn("Clr_Amt", functions.col("Clr_Amt").cast(DoubleType()))
df.dtypes

[('Agn_Org_Cnr_Cd', 'string'),
 ('Clr_Amt', 'double'),
 ('Exg_Plf_Sys_Srl_Nbr', 'string'),
 ('Main_EAC_Id', 'string'),
 ('Mch_Id', 'string'),
 ('Mch_Typ_Cd', 'string'),
 ('Trx_Typ_Cd', 'string'),
 ('timestamp', 'string')]

In [17]:
df.persist()

DataFrame[Agn_Org_Cnr_Cd: string, Clr_Amt: double, Exg_Plf_Sys_Srl_Nbr: string, Main_EAC_Id: string, Mch_Id: string, Mch_Typ_Cd: string, Trx_Typ_Cd: string, timestamp: string]

### 任务2：从原字段衍生出新字段，然后取合适的字段，直接作为每一条交易的特征，把特征矩阵保存成parquet文件  
#### 特征1：时间戳的小时数
#### 特征2：交易金额按1000, 10000分别做分桶
#### 特征3：商户类型取前2、3位前缀

注意：卡号和时间戳不能直接当做特征（请思考原因？）

In [53]:
def udf_get_hour():
    def f(x):
        return datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S").hour
    return functions.udf(f, IntegerType())

def udf_bucket(n):
    def f(x):
        return int(np.floor(x/n)) * n
    return functions.udf(f, IntegerType())

def udf_prefix(n):
    def f(x):
        return x[:n]
    return functions.udf(f, StringType())


In [54]:
df_derive = df.withColumn("timestamp_hour", udf_get_hour()("timestamp")
                         ).withColumn("amt_1000", udf_bucket(1000)("Clr_Amt")
                                     ).withColumn("amt_10000", udf_bucket(10000)("Clr_Amt")
                                     ).withColumn("mch_typ_2", udf_prefix(2)("Mch_Typ_Cd")
                                     ).withColumn("mch_typ_3", udf_prefix(3)("Mch_Typ_Cd")
                                     )
df_derive.show(5)

+--------------+-------+-------------------+---------------+----------+----------+----------+-------------------+--------------+--------+---------+---------+---------+
|Agn_Org_Cnr_Cd|Clr_Amt|Exg_Plf_Sys_Srl_Nbr|    Main_EAC_Id|    Mch_Id|Mch_Typ_Cd|Trx_Typ_Cd|          timestamp|timestamp_hour|amt_1000|amt_10000|mch_typ_2|mch_typ_3|
+--------------+-------+-------------------+---------------+----------+----------+----------+-------------------+--------------+--------+---------+---------+---------+
|           734|8456.89|    000000000049180|O6y2ExaDiS3zcTt|1883638063|      7720|       CWD|2017-01-02 04:23:28|             4|    8000|        0|       77|      772|
|           730|7969.15|    000000000049230|3K6OPAAK7GBPDOz|9908690440|      5161|       PER|2017-01-02 04:25:09|             4|    7000|        0|       51|      516|
|           815|6407.78|    000000000049280|n2YXilX5Te7CiDU|5784823589|      8521|       PER|2017-01-02 04:26:45|             4|    6000|        0|       85|   

In [None]:
# 选择一部分字段作为特征
cols_to_feature = ["Exg_Plf_Sys_Srl_Nbr", "Agn_Org_Cnr_Cd", "Mch_Id", 
                   "Mch_Typ_Cd", "mch_typ_2", "mch_typ_3", 
                   "timestamp_hour", "Clr_Amt", "amt_1000", "amt_10000", "Trx_Typ_Cd"]

df_derive.select(cols_to_feature).write.parquet("...") # 保存到自己的目录下

In [None]:
# TODO 与参考答案对比：/data/fresh_train/df_feature_single_event_answer_param

### 任务3：把每张卡片的所有交易groupby起来，按时间排序，然后对每条交易计算如下特征
#### 特征1：与前一条事件的对比，比如 与前一条交易的时间差，与前一条同国交易的时间差，与前一条同商户交易的时间差，与前一条交易的商户（或国家）是否相同
#### 特征2：当前事件的取值在历史事件中出现的次数和占比，比如 当前交易的国家（或商户）在此前出现过几次，占比是多少
#### 特征3：当前事件的金额与历史事件的对比，比如 当前交易金额与历史交易金额的均值的差值、比值以及zscore

In [67]:
# 添加一个字段：timestamp的float值，即，时间戳与1970-01-01的秒数差
def udf_ts_mktime(time_format="%Y-%m-%d %H:%M:%S"):
    def f(x):
        return time.mktime(datetime.datetime.strptime(x, time_format).timetuple())
    return functions.udf(f, DoubleType())

df = df.withColumn("ts_mktime", udf_ts_mktime()("timestamp"))
df.show(5)

+--------------+-------+-------------------+---------------+----------+----------+----------+-------------------+-------------+
|Agn_Org_Cnr_Cd|Clr_Amt|Exg_Plf_Sys_Srl_Nbr|    Main_EAC_Id|    Mch_Id|Mch_Typ_Cd|Trx_Typ_Cd|          timestamp|    ts_mktime|
+--------------+-------+-------------------+---------------+----------+----------+----------+-------------------+-------------+
|           734|8456.89|    000000000049180|O6y2ExaDiS3zcTt|1883638063|      7720|       CWD|2017-01-02 04:23:28|1.483302208E9|
|           730|7969.15|    000000000049230|3K6OPAAK7GBPDOz|9908690440|      5161|       PER|2017-01-02 04:25:09|1.483302309E9|
|           815|6407.78|    000000000049280|n2YXilX5Te7CiDU|5784823589|      8521|       PER|2017-01-02 04:26:45|1.483302405E9|
|           770|5062.58|    000000000049330|0Rc6NTM4zMIKvJD|8445948935|      5253|       PER|2017-01-02 04:28:20|  1.4833025E9|
|           595|5172.16|    000000000049380|hIQWBnm9K7Xyqfr|5237054012|      4373|       PER|2017-01-02 

In [68]:
rdd = df.rdd.map(list).persist()
schema = df.columns

In [106]:
# 先挑选某张卡片的事件，生成events，用于测试特征函数
ind_ts = schema.index("timestamp")
events = df.filter(functions.col("Main_EAC_Id")=="O6y2ExaDiS3zcTt").rdd.map(list).collect()
events = sorted(events, key=lambda x: x[ind_ts])
events

[[u'734',
  8456.89,
  u'000000000049180',
  u'O6y2ExaDiS3zcTt',
  u'1883638063',
  u'7720',
  u'CWD',
  u'2017-01-02 04:23:28',
  1483302208.0],
 [u'316',
  7104.6,
  u'000000003240334',
  u'O6y2ExaDiS3zcTt',
  u'5057487012',
  u'9937',
  u'PER',
  u'2017-03-17 01:13:04',
  1489684384.0]]

In [107]:
# 特征函数
def features(events, schema):
    """ 特征函数主函数 """
    ret_ls = []
    ind_nbr = schema.index("Exg_Plf_Sys_Srl_Nbr")
    for i,e in enumerate(events):
        current_event = e  # 当前交易
        history_events = events[:i] # 历史交易
        ret = dict()
        ret.update(feature1(current_event, history_events, schema))
        ret.update(feature2(current_event, history_events, schema))
        ret.update(feature3(current_event, history_events, schema))
        ret.update({"Exg_Plf_Sys_Srl_Nbr": e[ind_nbr]})
        ret_ls.append(ret)
    return ret_ls
        

def feature1(current_event, history_events, schema):
    """ 特征1 """
    ind_ts = schema.index("ts_mktime")
    ret = dict()
    if not history_events:
        ret["ts_diff"] = -1
        for col in ["Agn_Org_Cnr_Cd", "Mch_Id"]:
            ret["same_%s" % col] = -1
        for col in ["Agn_Org_Cnr_Cd", "Mch_Id"]:
            ret["ts_diff_same_%s" % col] = -1
        return ret
    
    ret["ts_diff"] = current_event[ind_ts] - history_events[-1][ind_ts]
    for col in ["Agn_Org_Cnr_Cd", "Mch_Id"]:
        ind = schema.index(col)
        if current_event[ind]==history_events[-1][ind]:
            ret["same_%s" % col] = 1
        else:
            ret["same_%s" % col] = 0
    
    for col in ["Agn_Org_Cnr_Cd", "Mch_Id"]:
        ind = schema.index(col)
        history_events_same = [e for e in history_events if e[ind]==current_event[ind]]
        if not history_events_same:
            ret["ts_diff_same_%s" % col] = -1
        else:
            ret["ts_diff_same_%s" % col] = current_event[ind_ts] - history_events_same[-1][ind_ts]
    return ret
    
def feature2(current_event, history_events, schema):
    """ 特征2 """
    ret = dict()
    for col in ["Agn_Org_Cnr_Cd", "Mch_Id"]:
        ind = schema.index(col)
        ret["history_%s_eventcnt" % col] = len([1 for e in history_events if e[ind]==current_event[ind]])
        if not history_events:
            ret["history_%s_eventratio" % col] = -1
        else:
            ret["history_%s_eventratio" % col] = 1.0*ret["history_%s_eventcnt" % col]/len(history_events)
    return ret

def feature3(current_event, history_events, schema):
    """ 特征3 """
    ret = dict()
    ind = schema.index("Clr_Amt")
    if not history_events:
        ret["amt_diff"] = -1
        ret["amt_diff_ratio"] = -1
        ret["amt_zscore"] = -1
        return ret
        
    amt_ls = [e[ind] for e in history_events]
    amt_avg = np.mean(amt_ls)
    amt_std = np.std(amt_ls)
    ret["amt_diff"] = float(current_event[ind] - amt_avg)
    ret["amt_diff_ratio"] = float(1.0*current_event[ind]/(amt_avg + 1e-8))
    ret["amt_zscore"] = float(1.0*(current_event[ind] - amt_avg)/(amt_std + 1e-8))
    return ret
    

In [108]:
features(events, schema)

[{'Exg_Plf_Sys_Srl_Nbr': u'000000000049180',
  'amt_diff': -1,
  'amt_diff_ratio': -1,
  'amt_zscore': -1,
  'history_Agn_Org_Cnr_Cd_eventcnt': 0,
  'history_Agn_Org_Cnr_Cd_eventratio': -1,
  'history_Mch_Id_eventcnt': 0,
  'history_Mch_Id_eventratio': -1,
  'same_Agn_Org_Cnr_Cd': -1,
  'same_Mch_Id': -1,
  'ts_diff': -1,
  'ts_diff_same_Agn_Org_Cnr_Cd': -1,
  'ts_diff_same_Mch_Id': -1},
 {'Exg_Plf_Sys_Srl_Nbr': u'000000003240334',
  'amt_diff': -1352.289999999999,
  'amt_diff_ratio': 0.8400960636819917,
  'amt_zscore': -135228999999.99991,
  'history_Agn_Org_Cnr_Cd_eventcnt': 0,
  'history_Agn_Org_Cnr_Cd_eventratio': 0.0,
  'history_Mch_Id_eventcnt': 0,
  'history_Mch_Id_eventratio': 0.0,
  'same_Agn_Org_Cnr_Cd': 0,
  'same_Mch_Id': 0,
  'ts_diff': 6382176.0,
  'ts_diff_same_Agn_Org_Cnr_Cd': -1,
  'ts_diff_same_Mch_Id': -1}]

In [109]:
ind_eacid = schema.index("Main_EAC_Id")
ind_ts = schema.index("timestamp")

rdd_feature = rdd.map(lambda x: (x[ind_eacid], x)
       ).groupByKey().map(lambda x: sorted(list(x[1]), key=lambda y: y[ind_ts])
                         ).flatMap(lambda x: features(x, schema)
                                  )
rdd_feature.take(2)

[{'Exg_Plf_Sys_Srl_Nbr': u'000000007289427',
  'amt_diff': -1,
  'amt_diff_ratio': -1,
  'amt_zscore': -1,
  'history_Agn_Org_Cnr_Cd_eventcnt': 0,
  'history_Agn_Org_Cnr_Cd_eventratio': -1,
  'history_Mch_Id_eventcnt': 0,
  'history_Mch_Id_eventratio': -1,
  'same_Agn_Org_Cnr_Cd': -1,
  'same_Mch_Id': -1,
  'ts_diff': -1,
  'ts_diff_same_Agn_Org_Cnr_Cd': -1,
  'ts_diff_same_Mch_Id': -1},
 {'Exg_Plf_Sys_Srl_Nbr': u'000000009753659',
  'amt_diff': -788.74,
  'amt_diff_ratio': 0.05169884819713253,
  'amt_zscore': -78874000000.0,
  'history_Agn_Org_Cnr_Cd_eventcnt': 0,
  'history_Agn_Org_Cnr_Cd_eventratio': 0.0,
  'history_Mch_Id_eventcnt': 0,
  'history_Mch_Id_eventratio': 0.0,
  'same_Agn_Org_Cnr_Cd': 0,
  'same_Mch_Id': 0,
  'ts_diff': 4929215.0,
  'ts_diff_same_Agn_Org_Cnr_Cd': -1,
  'ts_diff_same_Mch_Id': -1}]

In [111]:
# 生成特征矩阵，并保存起来
feature_schema = features(events, schema)[0].keys()

df_feature = spark.createDataFrame(rdd_feature.map(lambda x: [x[k] for k in feature_schema]), schema=feature_schema)

In [112]:
df_feature.write.mode("Overwrite").parquet(...) # 写入你的目录里

In [None]:
# TODO 与参考答案对比：/data/fresh_train/df_feature_single_event_answer_diff

### 任务4：计算全局统计特征，保存成df
#### 特征1：每个商户有多少条事件，join到每一条交易上
#### 特征2：每个国家出现过多少个卡片，join到每一条交易上
#### 特征3：计算全局交易金额的100个分位数，然后给每一条交易的金额赋值其所属的分位数值

In [114]:
# 特征1
df_feature1 = df.select("Mch_Id").groupby("Mch_Id").count().withColumnRenamed("count", "global_mchid_eventcnt")
df_feature1.show(5)

+----------+---------------------+
|    Mch_Id|global_mchid_eventcnt|
+----------+---------------------+
|4313943796|                  275|
|5815197939|                  279|
|9593487685|                  245|
|8649911703|                  272|
|5598132391|                  254|
+----------+---------------------+
only showing top 5 rows



In [115]:
# 特征2
df_feature2 = df.select(["Agn_Org_Cnr_Cd", "Main_EAC_Id"]
                       ).drop_duplicates().groupby("Agn_Org_Cnr_Cd"
                                                  ).count().withColumnRenamed("count", "global_cnr_eaccnt")
df_feature2.show(5)

+--------------+-----------------+
|Agn_Org_Cnr_Cd|global_cnr_eaccnt|
+--------------+-----------------+
|           574|           145185|
|           851|           144726|
|           495|           146813|
|           101|           133681|
|           986|           132078|
+--------------+-----------------+
only showing top 5 rows



In [116]:
# 特征3
def df_get_rank_map_dict(df, col, rank_range=100, err=0):
    """ 对单个字段按取值大小顺序排序，输出100分位数对应的取值字典 """
    col_count = df.count()
    if col_count < rank_range:
        rank_range = col_count
    prec_list = [1.0 * x / rank_range for x in range(rank_range)]
    return zip(df.approxQuantile(col, prec_list, err), prec_list)

amt_rank_map = df_get_rank_map_dict(df, "Clr_Amt")
amt_rank_map

[(0.5, 0.0),
 (100.38, 0.01),
 (200.39, 0.02),
 (300.68, 0.03),
 (400.93, 0.04),
 (501.23, 0.05),
 (601.31, 0.06),
 (700.99, 0.07),
 (800.7, 0.08),
 (900.61, 0.09),
 (1000.79, 0.1),
 (1100.25, 0.11),
 (1200.53, 0.12),
 (1300.71, 0.13),
 (1401.01, 0.14),
 (1500.94, 0.15),
 (1601.17, 0.16),
 (1701.13, 0.17),
 (1801.11, 0.18),
 (1900.98, 0.19),
 (2001.1, 0.2),
 (2100.74, 0.21),
 (2200.98, 0.22),
 (2301.15, 0.23),
 (2400.84, 0.24),
 (2500.78, 0.25),
 (2600.88, 0.26),
 (2701.16, 0.27),
 (2801.14, 0.28),
 (2901.47, 0.29),
 (3001.27, 0.3),
 (3101.28, 0.31),
 (3201.51, 0.32),
 (3301.82, 0.33),
 (3401.7, 0.34),
 (3502.2, 0.35),
 (3601.86, 0.36),
 (3701.54, 0.37),
 (3801.89, 0.38),
 (3901.73, 0.39),
 (4002.28, 0.4),
 (4102.31, 0.41),
 (4201.97, 0.42),
 (4301.68, 0.43),
 (4401.61, 0.44),
 (4501.83, 0.45),
 (4601.69, 0.46),
 (4701.74, 0.47),
 (4801.84, 0.48),
 (4902.24, 0.49),
 (5002.14, 0.5),
 (5102.13, 0.51),
 (5202.44, 0.52),
 (5302.36, 0.53),
 (5402.84, 0.54),
 (5502.95, 0.55),
 (5602.75, 0.56

In [117]:
import bisect

def udf_feature_rank(amt_rank_map):
    def f(x):
        values = [i[0] for i in amt_rank_map]
        index_ls = [i[1] for i in amt_rank_map]
        index_bisect = bisect.bisect_left(values, x) # 二分查找
        if index_bisect == len(values):
            index_bisect = len(values) - 1
        return index_ls[index_bisect]
    return functions.udf(f, DoubleType())

df_feature3 = df.select("Clr_Amt"
                       ).drop_duplicates().withColumn("global_amt_rank", udf_feature_rank(amt_rank_map)("Clr_Amt"))
df_feature3.show(5)

+-------+---------------+
|Clr_Amt|global_amt_rank|
+-------+---------------+
| 8920.4|            0.9|
|7475.33|           0.75|
|1268.66|           0.13|
|5760.11|           0.58|
|3411.23|           0.35|
+-------+---------------+
only showing top 5 rows



In [120]:
# 特征矩阵join回df
df_feature_final = df
df_feature_final = df_feature_final.join(df_feature1, how="left", on="Mch_Id")
df_feature_final = df_feature_final.join(df_feature2, how="left", on="Agn_Org_Cnr_Cd")
df_feature_final = df_feature_final.join(df_feature3, how="left", on="Clr_Amt")
df_feature_final.select(["Exg_Plf_Sys_Srl_Nbr", "global_mchid_eventcnt", "global_cnr_eaccnt", "global_amt_rank"]).show(5)

+-------------------+---------------------+-----------------+---------------+
|Exg_Plf_Sys_Srl_Nbr|global_mchid_eventcnt|global_cnr_eaccnt|global_amt_rank|
+-------------------+---------------------+-----------------+---------------+
|    000000018840546|                  255|           136695|           0.01|
|    000000012160437|                  294|           137955|           0.01|
|    000000009794165|                  292|           133681|           0.01|
|    000000015931474|                  285|           143701|           0.01|
|    000000016059207|                  267|           130228|           0.01|
+-------------------+---------------------+-----------------+---------------+
only showing top 5 rows



In [121]:
df_feature_final.select(["Exg_Plf_Sys_Srl_Nbr", "global_mchid_eventcnt", "global_cnr_eaccnt", "global_amt_rank"]
                       ).write.parquet(...)  # 写入你自己的目录

In [None]:
# TODO 与参考答案对比：/data/fresh_train/df_feature_single_event_answer_global

### 任务5：把任务2、3、4生成的特征全部join到一起，生成最终版特征矩阵

In [123]:
df_feature1 = spark.read.parquet("/data/fresh_train/df_feature_single_event_answer_param")
df_feature2 = spark.read.parquet("/data/fresh_train/df_feature_single_event_answer_diff")
df_feature3 = spark.read.parquet("/data/fresh_train/df_feature_single_event_answer_global")

df_feature1.count(), df_feature2.count(), df_feature3.count()  # 三个矩阵的样本数量需要相同

(13851968, 13851968, 13851968)

In [124]:
df_feature_final = df_feature1
df_feature_final = df_feature_final.join(df_feature2, how="inner", on="Exg_Plf_Sys_Srl_Nbr")
df_feature_final = df_feature_final.join(df_feature3, how="inner", on="Exg_Plf_Sys_Srl_Nbr")
df_feature_final.count()

13851968

In [126]:
df_feature_final.write.parquet(...) # 写入你自己的目录

In [None]:
# TODO 与参考答案对比：/data/fresh_train/df_feature_single_event_answer