## Overview

In [2]:
%run "/Users/K28979/SrmRkvLibPy"

In [3]:
from pyspark.sql.functions import regexp_replace, col
from sklearn.metrics import average_precision_score
import plotly.express as px


In [4]:
# File location and type
file_location = "/FileStore/tables/prac1train/data_train.csv"
file_location_test = "/FileStore/tables/data_test.csv"

file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_train_raw = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df_test_raw = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_test)

In [5]:
display(df_train_raw)

id,time,customer_id,age_group,gender,zipcode_customer,merchant_id,zipcode_merchant,type,amount,fraud
0,18,'C1734879586','1','F','28007','M348934600','28007','es_transportation',24.03,0
1,125,'C1896147467','2','F','28007','M1823072687','28007','es_transportation',41.08,0
2,7,'C1590346257','1','F','28007','M348934600','28007','es_transportation',37.59,0
3,66,'C16891369','3','M','28007','M348934600','28007','es_transportation',51.59,0
4,140,'C635222317','3','F','28007','M1823072687','28007','es_transportation',20.17,0
5,124,'C1085689043','2','M','28007','M1823072687','28007','es_transportation',9.04,0
6,112,'C2064491438','5','F','28007','M348934600','28007','es_transportation',51.06,0
7,150,'C980181294','4','F','28007','M348934600','28007','es_transportation',35.0,0
8,79,'C261398093','3','F','28007','M1823072687','28007','es_transportation',20.22,0
9,46,'C1457086535','2','M','28007','M1823072687','28007','es_transportation',56.05,0


In [6]:
display(df_test_raw)

id,time,customer_id,age_group,gender,zipcode_customer,merchant_id,zipcode_merchant,type,amount
0,168,'C969635352','2','F','28007','M1823072687','28007','es_transportation',16.11
1,60,'C1067668919','2','F','28007','M348934600','28007','es_transportation',9.8
2,170,'C1347371891','2','M','28007','M1823072687','28007','es_transportation',30.81
3,124,'C1100053869','2','M','28007','M1823072687','28007','es_transportation',54.82
4,159,'C1110919755','1','F','28007','M348934600','28007','es_transportation',41.02
5,84,'C346428597','3','M','28007','M1823072687','28007','es_transportation',24.2
6,158,'C1645168081','3','F','28007','M348934600','28007','es_transportation',22.34
7,28,'C1793707767','4','F','28007','M1600850729','28007','es_fashion',38.92
8,152,'C813472343','4','M','28007','M1823072687','28007','es_transportation',11.09
9,55,'C703418892','2','F','28007','M348934600','28007','es_transportation',19.84


### 辅助函数

In [8]:
#除去单引号
def remove_quote(df):
  
  for col in df.columns:
  
    df = df.withColumn( col, regexp_replace(col , "'", ""))
  
  return df

#处理数据类型
def data_type(df):
  
  str_cols = ['customer_id','age_group','gender','merchant_id','type']
  int_cols_test = ['id','time','zipcode_customer','zipcode_merchant']
  int_cols = ['id','time','zipcode_customer','zipcode_merchant','fraud']
  double_cols = [ 'amount']
  
  if 'fraud' in df.columns:
  
    for col in df.columns:
      if col in int_cols:
          df = df.withColumn(col,f.col(col).cast("integer"))
      elif col in str_cols:
          df = df.withColumn(col,f.col(col).cast("string"))
      elif col in double_cols:
          df = df.withColumn(col,f.col(col).cast("double")) 
  else:
    
    for col in df.columns:
      if col in int_cols_test:
        df = df.withColumn(col,f.col(col).cast("integer"))
      elif col in str_cols:
        df = df.withColumn(col,f.col(col).cast("string"))
      elif col in double_cols:
        df = df.withColumn(col,f.col(col).cast("double")) 
        
  return df

#查是否有空值
def check_nullvalue(df):
  
  k = []
  
  for a in df.columns:
    ct = df.filter(f.col(a) == None).count()
    
    k.append(ct)
    
    if ct>0:
      print('null value of',a, ':', ct)

        
  if sum(kk) == 0:
    print('No null values')
    
#将dataframe转为array
def process_data(df):
    
    cat_arry = OneHotEncoder(handle_unknown="ignore").fit_transform(df.select("age_group","gender","type").collect())\
                .toarray().astype(np.float32)
    
    scale_ary = MinMaxScaler().fit_transform(
    np.array(df.select("amount","time","avg_sales","qty_ct").collect(),dtype="float").astype(np.float32))
    
    final_ary = np.concatenate((cat_arry,scale_ary),axis=1)
    
    return final_ary
  
#显示预测结果  
def result(y_vali,y_pred):
    print("AP value：",average_precision_score(y_vali, y_pred))

### data process

In [10]:
#df_test
df_test_noquote = remove_quote(df_test_raw)
df_test = data_type(df_test_noquote)

#df_train 
df_train_noquote = remove_quote(df_train_raw)
df_train = data_type(df_train_noquote)
df_train = df_train.filter(f.col("amount")>=0)

In [11]:
display(df_test)

id,time,customer_id,age_group,gender,zipcode_customer,merchant_id,zipcode_merchant,type,amount
0,168,C969635352,2,F,28007,M1823072687,28007,es_transportation,16.11
1,60,C1067668919,2,F,28007,M348934600,28007,es_transportation,9.8
2,170,C1347371891,2,M,28007,M1823072687,28007,es_transportation,30.81
3,124,C1100053869,2,M,28007,M1823072687,28007,es_transportation,54.82
4,159,C1110919755,1,F,28007,M348934600,28007,es_transportation,41.02
5,84,C346428597,3,M,28007,M1823072687,28007,es_transportation,24.2
6,158,C1645168081,3,F,28007,M348934600,28007,es_transportation,22.34
7,28,C1793707767,4,F,28007,M1600850729,28007,es_fashion,38.92
8,152,C813472343,4,M,28007,M1823072687,28007,es_transportation,11.09
9,55,C703418892,2,F,28007,M348934600,28007,es_transportation,19.84


In [12]:
for col in df_test.columns:
    df_test.select(col).describe().show()

In [13]:
for col in df_train.columns:
    df_train.select(col).describe().show()

In [14]:
def process_data(df):
    
    cat_arry = OneHotEncoder(handle_unknown="ignore").fit_transform(df.select("age_group","gender","type").collect())\
                .toarray().astype(np.float32)
    
    scale_ary = MinMaxScaler().fit_transform(
    np.array(df.select("amount","time","avg_sales","qty_ct").collect(),dtype="float").astype(np.float32))
    
    final_ary = np.concatenate((cat_arry,scale_ary),axis=1)
    
    return final_ary

In [15]:
#将数据分为train和validate集
trainDf, valiDf = df_train.randomSplit([0.75, 0.25],seed = 10)

#Upsampleing因为fraud的例子少，通过增加fraud的例子，利于模型找到fraud的规律
# fraudDf = trainDf.filter(f.col('fraud')==1)
# new_trainDf_union = trainDf.union(fraudDf)

#新加两个特征
#特征2： qty_ct 商铺的总流量
#特征3： avg_sales 商铺的平均销售量
embed2 = trainDf.withColumn("qty_ct", f.count("customer_id").over(Window.partitionBy("merchant_id"))).select("merchant_id","qty_ct").distinct()
embed3 = trainDf.withColumn("avg_sales", f.avg("amount").over(Window.partitionBy("merchant_id"))).select("merchant_id","avg_sales").distinct()

#将得到的三个特征通过join的形式赋予到train和validate集
new_trainDf = trainDf.join(embed2,"merchant_id","left").join(embed3,"merchant_id","left")
new_valiDf = valiDf.join(embed2,"merchant_id","left").join(embed3,"merchant_id","left")

#upsampling 平衡数据分布
fraudDf = new_trainDf.filter(f.col("fraud")==1)
new_trainDf_union = new_trainDf.union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf)
                               
                            
#将类型值用dummy处理，将数据值做normalization 
X_train = process_data(new_trainDf_union)
y_train = np.array(new_trainDf_union.select("fraud").collect(),dtype="float").astype(np.float32)

X_vali = process_data(new_valiDf)
y_vali = np.array(new_valiDf.select("fraud").collect(),dtype="float").astype(np.float32)

In [16]:
new_trainDf_union = new_trainDf.union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf).union(fraudDf)
                               
                            
#将类型值用dummy处理，将数据值做normalization 
X_train = process_data(new_trainDf_union)
y_train = np.array(new_trainDf_union.select("fraud").collect(),dtype="float").astype(np.float32)

X_vali = process_data(new_valiDf)
y_vali = np.array(new_valiDf.select("fraud").collect(),dtype="float").astype(np.float32)


classifier = XGBClassifier()
classifier.fit(X_train, y_train)
y_pred_xg = classifier.predict_proba(X_vali)
result(y_vali,y_pred_xg[:,1])

In [17]:
from xgboost import XGBClassifier

classifier = XGBClassifier()
classifier.fit(X_train, y_train)
y_pred_xg = classifier.predict_proba(X_vali)
result(y_vali,y_pred_xg[:,1])

### xgboost调参数

In [19]:
eta = [0.05,0.1,0.15] 
max_depth = [6,7,8] 
n_estimators = [100,300,500] 
ap = []

for lr in eta:
  for depth in max_depth:
    for n in n_estimators:
      model = XGBClassifier(learning_rate=lr,
                      n_estimators=n,         # 树的个数--棵树建立xgboost
                      max_depth=depth,               # 树的深度
                      min_child_weight = 1,      # 叶子节点最小权重
                      gamma=0.,                  # 惩罚项中叶子结点个数前的参数
                      subsample=0.8,             # 随机选择80%样本建立树
                      colsample_btree=0.8,       # 随机算哦80%样本选择特征
                      scale_pos_weight=1,        # 解决样本个数不平衡的问题
                      random_state=27            # 随机数
                      )
      
      model.fit(X_train, y_train)
      y_pred_xg = model.predict_proba(X_vali)
      outcome = average_precision_score(y_vali, y_pred_xg[:,1]) 
      ap.append(outcome)
      print('learning rate',lr,'max depth',depth,'n_estimator',n,'apvalue',outcome)

In [20]:
parameters = []
idx = 0

for lr in eta:
    for depth in max_depth:
      for n in n_estimators:
        parameters.append([lr,depth,n,ap[idx]])
        idx = idx+1
      
resultDf = sqlContext.createDataFrame([(float(lr),float(depth),float(n),float(score)) for lr,depth,n,score in parameters],["learning_rate","max_depth","n_estimator","AP_score"])
display(resultDf.orderBy(desc("AP_score")))

learning_rate,max_depth,n_estimator,AP_score
0.1,6.0,100.0,0.9152676455907172
0.05,6.0,300.0,0.9141722320115292
0.05,6.0,100.0,0.9134657879355264
0.05,7.0,100.0,0.913182102658036
0.15,6.0,100.0,0.9130033323898044
0.1,7.0,100.0,0.912956272466472
0.05,7.0,300.0,0.9114518047980716
0.05,8.0,100.0,0.9109272678354946
0.15,7.0,100.0,0.910308359688967
0.05,6.0,500.0,0.9102066925200568


####选最好的参数 learning_rate=0.1,max_depth=6,n_estimaor=100

### Predict test

In [23]:
#用df_train做两个新的特征
t_embed2 = df_train.withColumn("qty_ct", f.count("customer_id").over(Window.partitionBy("merchant_id"))).select("merchant_id","qty_ct").distinct()
t_embed3 = df_train.withColumn("avg_sales", f.avg("amount").over(Window.partitionBy("merchant_id"))).select("merchant_id","avg_sales").distinct()

#将得到的特征通过join的形式赋予到df_train和df_test集
new_df_train = df_train.join(t_embed2,"merchant_id","left").join(t_embed3,"merchant_id","left")
new_df_test = df_test.join(t_embed2,"merchant_id","left").join(t_embed3,"merchant_id","left")

#upsampling 平衡数据分布
new_fraudDf = new_df_train.filter(f.col("fraud")==1)
new_trainDf_union_t = new_df_train.union(new_fraudDf).union(new_fraudDf).union(new_fraudDf).union(new_fraudDf).union(new_fraudDf)

#将类型值用dummy处理，将数据值做normalization 
X_train_t = process_data(new_trainDf_union_t)
y_train_t = np.array(new_trainDf_union_t.select("fraud").collect(),dtype="float").astype(np.float32)

X_test_t = process_data(new_df_test)

In [24]:
display(new_trainDf_union_t.describe())

summary,merchant_id,id,time,customer_id,age_group,gender,zipcode_customer,zipcode_merchant,type,amount,fraud,qty_ct,avg_sales
count,313783,313783.0,313783.0,313783,313783,313783,313783.0,313783.0,313783,313783.0,313783.0,313783.0,313783.0
mean,,148251.7107714567,94.75919664226552,,3.021627950752464,,28007.0,28007.0,,65.20988587654544,0.066083885997648,105715.75246268918,57.51724814429238
stddev,,85567.67908217182,51.036171807866175,,1.3598371351163803,,0.0,0.0,,254.01687214669067,0.2484290697482661,54561.55564072902,178.00028350871327
min,M1053599405,0.0,0.0,C1000148617,0,E,28007.0,28007.0,es_barsandrestaurants,0.0,0.0,2.0,26.88540076030433
max,M980657600,296507.0,179.0,C999723254,U,U,28007.0,28007.0,es_wellnessandbeauty,7635.41,1.0,149940.0,2295.4177887788765


In [25]:
display(new_df_test.describe())

summary,merchant_id,id,time,customer_id,age_group,gender,zipcode_customer,zipcode_merchant,type,amount,qty_ct,avg_sales
count,298135,298135.0,298135.0,298135,298135,298135,298135.0,298135.0,298135,298135.0,298135.0,298135.0
mean,,149067.0,94.99986247840744,,2.9875581477482567,,28007.0,28007.0,,38.13674503161288,111473.1921646234,37.78154309628468
stddev,,86064.3055898708,51.12165737741309,,1.2884120363598184,,0.0,0.0,,111.6270587439932,49909.07010823207,82.89790046308327
min,M1053599405,0.0,0.0,C100045114,0,E,28007.0,28007.0,es_barsandrestaurants,0.0,2.0,26.88540076030433
max,M980657600,298134.0,179.0,C999393223,U,U,28007.0,28007.0,es_wellnessandbeauty,8329.96,149940.0,2295.4177887788765


In [26]:
model_t = XGBClassifier(learning_rate=0.01,
                    n_estimators=100,         # 树的个数--100棵树建立xgboost
                    max_depth=6,               # 树的深度
                    min_child_weight = 1,      # 叶子节点最小权重
                    gamma=0.,                  # 惩罚项中叶子结点个数前的参数
                    subsample=0.8,             # 随机选择80%样本建立树
                    colsample_btree=0.8,       # 随机算哦80%样本选择特征
                    scale_pos_weight=1,        # 解决样本个数不平衡的问题
                    random_state=27            # 随机数
                    )
      
model_t.fit(X_train_t, y_train_t)
y_pred_test = model_t.predict_proba(X_test_t)

In [27]:
outcome = y_pred_test[:,1]
df_outcome = sqlContext.createDataFrame( [ (idx,float(i)) for idx,i in enumerate(outcome)],["id","result"])

In [28]:
display(df_outcome)

id,result
0,0.1834888905286789
1,0.1834888905286789
2,0.1834888905286789
3,0.1834888905286789
4,0.1834888905286789
5,0.1834888905286789
6,0.1834888905286789
7,0.1911083161830902
8,0.1834888905286789
9,0.1834888905286789
