In [2]:
import pandas as pd 
import gc
import os 
import warnings
import numpy as np 
import time
import re
from sklearn.preprocessing import OneHotEncoder,LabelEncoder
from contextlib import contextmanager
from pyspark.sql.functions import  udf
from pyspark.sql.types import  IntegerType
from pyspark.sql.types import  DoubleType
from pyspark.sql.functions import  *
warnings.filterwarnings('ignore')

In [3]:
# from pyspark import SparkService
# # # 创建local模式的SparkSession
# spark = SparkService.get_local_spark(executor_instances=3, driver_mem='10g')

In [4]:
from pyspark import SparkService
# 创建spark会话，自定义资源
# spark = SparkService.get_spark(executor_instances=1, per_executor_mem='1g', driver_mem='1g')
# 创建spark会话，使用默认资源
spark = SparkService.get_spark(executor_instances=4,per_executor_mem='4g',driver_mem='2g')


In [5]:
# # the spark short for SparkSession and sc short for SparkContext have already declared
spark.sql("show databases").show()
spark.sql("use qx_testing").show()
spark.sql("show tables").toPandas()



+----------------+
|    databaseName|
+----------------+
|        aip_demo|
|         aipdemo|
| chinese_segment|
|        customer|
|       dataworks|
|         default|
|   email_message|
|     outertables|
|      qx_testing|
|       sitevisit|
|   sitevisit_dev|
|suspect_phonenum|
+----------------+

++
||
++
++



Unnamed: 0,database,tableName,isTemporary
0,qx_testing,apma,False
1,qx_testing,app_events,False
2,qx_testing,app_labels,False
3,qx_testing,breastw,False
4,qx_testing,credit_bureau,False
5,qx_testing,credit_credit_card_balanced,False
6,qx_testing,credit_pos_cash_balanced,False
7,qx_testing,credit_previous_application,False
8,qx_testing,credit_test,False
9,qx_testing,credit_train,False


## 重新刷一遍表

In [4]:
paths='/opt/notebook/xuyinghao'

def createtTable(paths):
    for file in os.listdir():
        col_list =[]
        if 'csv' in file:
            print('table %s 正在导入'%(file.replace('.csv','')))
            start = time.time()
            dir_ = os.path.join(paths,file)
            df = pd.read_csv(dir_)
            df.replace(np.NAN,'',inplace=True)
            for col in df.columns:
                col_list.append(col.lower())
            df.columns = col_list
            df[col_list]=df[col_list].astype(str)
            spark_table = spark.createDataFrame(df,verifySchema=False)
            del df,col_list
            gc.collect()
            spark_table.write.saveAsTable(name='qx_testing.home_credit_%s'%(file.replace('.csv','')),mode='overwrite',partitionBy=None)
            end = time.time()
            print('table %s 导入成功!'%(file.replace('.csv','')))
            print('导入开销%f s'%(end-start))
 


In [5]:
# createtTable(paths)

### 定义一些辅助函数

In [3]:

def change_age_tobin(days_birth):
    
    x = - days_birth/365
    if x < 20: return 1
    elif x < 30: return 2
    elif x < 40: return 3
    elif x < 50: return 4
    elif x < 60: return 5
    else: 
        return 0
    
change_age_tobin = udf(change_age_tobin,returnType=IntegerType())
    
def cal_mean(df, group_cols, col, agg_name):
    """
    计算均值
    """
    gp = df.select(group_cols+[col]).groupBy(group_cols).agg(mean(col).alias(agg_name))
    df = df.join(gp,on=group_cols,how='left')
    del gp;gc.collect()
    return df
    


def cal_std(df, group_cols, col, agg_name):
    """
    计算标准差
    """
    gp = df[group_cols + [col]].groupby(group_cols).agg(stddev(col).alias(agg_name))
    df = df.join(gp, on=group_cols, how='left')
    del gp
    gc.collect()
    return df

def onehot_label_encoder(df):
    """
    将object类型的变量进行编码，小于等于2的进行标签编码，大于2的onehot编码,
    同时对于高基数类别型特征可以使用平均数编码
    """
    le = LabelEncoder()

    for col in df.columns:
        if df[col].dtype=='object'and len(df[col].unique())<=2:
            le.fit(df[col])
            df[col] = le.transform(df[col])
        else:
            if df[col].dtype=='object'and len(df[col].unique())>2:
                df = pd.get_dummies(df,columns=[col])
    return df
def read_table(file_or_path):
    df = pd.read_csv(file_or_path)
    col_list = []
    for col in df.columns:
        col_list.append(col.lower())
    df.columns = col_list
    return df 


def group(df_to_agg, prefix, aggregations, aggregate_by= 'sk_id_curr'):
    """
     对每个表按照主键groupby
    """
    agg_df = df_to_agg.groupby(aggregate_by).agg(aggregations)
    df_col =agg_df.columns
    rename_col =[]
    for col in df_col:
        tmp_col = re.split("\(|\)",col)
        tmp_col.remove('')
        tmp_col = tmp_col[::-1]
        rename_col.append(prefix+'_'.join(tmp_col))
    #映射后的列名
    mapping = dict(zip(df_col,rename_col))
  
    agg_df = agg_df.select([col(c).alias(mapping.get(c, c)) for c in df_col])
    
    return agg_df

    


@contextmanager
def timer(name):
    t0 = time.time()
    yield
    print("{} - done in {:.0f}s".format(name, time.time() - t0))


### 处理主表

In [7]:
def process_train_test_spark():
    df = spark.sql('select * from qx_testing.home_credit_application_train')
    
    #过滤掉异常样本以及用NAN替换掉异常值
    df = df.filter(df['code_gender']!='XNA').filter(df['amt_income_total']<20000000)
    df = df.replace(365243,np.nan,subset=['days_employed'])
    df = df.replace(0,np.nan,subset=['days_last_phone_change'])

    docs = [x for x in df.columns if 'flag_doc' in x]
    sum_string = udf(lambda arr:sum(arr),IntegerType())
    #将有flag_doc的列进行求和
    df = df.withColumn('document_count',sum_string(array(docs)))
    for col in docs:
        df = df.drop(col)
    
    df = df.withColumn('ext_sources_prod',df['ext_source_1']+df['ext_source_2']+df['ext_source_3'])
    df = df.withColumn('ext_source_weighted',df['ext_source_1']*2+df['ext_source_2']*1+df['ext_source_3']*3)
    df = df.withColumn('age_bin',change_age_tobin('days_birth'))
    
    
    #构建一些比例特征
    df = df.withColumn('credit_to_annuity_ratio',df['amt_credit'] / df['amt_annuity'])
    df = df.withColumn('credit_to_goods_ratio',df['amt_credit'] / df['amt_goods_price'])
    # 收入类型比列
    
    df = df.withColumn('annuity_to_income_ratio',df['amt_annuity'] / df['amt_income_total'])
    df = df.withColumn('credit_to_income_ratio',df['amt_credit'] / df['amt_income_total'])
    df = df.withColumn('income_to_employed_ratio',df['amt_income_total'] / df['days_employed'])
    df = df.withColumn('income_to_birth_ratio',df['amt_income_total'] / df['days_birth'])
    # 时间序列形式比列特征
    
    df = df.withColumn('employed_to_birth_ratio',df['days_employed'] / df['days_birth'])
    df = df.withColumn('car_to_birth_ratio',df['own_car_age'] / df['days_birth'])
    df = df.withColumn('car_to_employed_ratio',df['own_car_age'] / df['days_employed'])
    
    #统计特征
    #重点关注EXT_SOURCE_1，EXT_SOURCE_2，EXT_SOURCE_3三个字段
    df= df.withColumn('ext_sources_min',least('ext_source_1','ext_source_2','ext_source_3'))
    df= df.withColumn('ext_sources_max',greatest('ext_source_1','ext_source_2','ext_source_3'))
    df = df.withColumn('ext_sources_mean',(df['ext_source_1']+df['ext_source_2']+df['ext_source_3'])/3)
    
 
    group_col = ['organization_type', 'name_education_type', 'occupation_type', 'age_bin', 'code_gender']
  
    #根据group_col来计算分组后的ext_source_median
    df=cal_mean(df,group_col,'ext_sources_mean','group_ext_sources_median')
    df =cal_std(df,group_col,'ext_sources_mean','group_ext_sources_std')
#     #计算分组后收入的平均值
    df = cal_mean(df,group_col,'amt_income_total','group_income_mean')
    df = cal_std(df,group_col,'amt_income_total','group_income_std')
    #计算分组申请贷款的金额
    df = cal_mean(df,group_col,'amt_credit','group_credit_mean')
    df = cal_std(df,group_col,'amt_credit','group_credit_std')
    df = cal_mean(df,group_col,'amt_annuity','group_annuity_mean')
    df = cal_std(df,group_col,'amt_annuity','group_annuity_std')
    
#     #变量编码
#     df = onehot_label_encoder(df)
    
    return df

### 衍生bureau以及bureau_balance表特征

In [10]:
def process_bureau_and_balance():
    df_bureau = spark.sql('select * from qx_testing.home_credit_bureau')
    df_balance = spark.sql('select * from qx_testing.home_credit_bureau_balance')
    df = df_bureau.join(df_balance,on='sk_id_bureau',how='left')
    del df_bureau,df_balance;gc.collect()
    
    ##衍生一些比例特征
    df = df.withColumn('credit_sum_overdue_ratio',df['amt_credit_sum_overdue']/df['amt_credit_sum'])
    df = df.withColumn('debt_percentage',df['amt_credit_sum_debt']/df['amt_credit_sum'])
    df = df.withColumn('credit_to_annuity_ratio',df['amt_credit_sum']/df['amt_annuity'])
 
    
    #衍生一些统计特征
    group_col=['sk_id_bureau']
    df = cal_mean(df,group_col,'amt_credit_sum_debt','group_sum_debt_mean')
    df= cal_std(df,group_col,'amt_credit_sum_debt','group_sum_debt_std')
    df = cal_mean(df,group_col,'amt_credit_sum_overdue','group_sum_overdue_mean')
    df = cal_std(df,group_col,'amt_credit_sum_overdue','group_sum_overdue_std')
    
    #类别型字段编码
    bureau_agg ={
#     'sk_id_bureau': ['nunique'],
    'days_credit': ['min', 'max', 'mean'],
    'days_credit_enddate': ['min', 'max'],
    'amt_credit_max_overdue': ['max', 'mean'],
    'amt_credit_sum': ['max', 'mean', 'sum'],
    'amt_credit_sum_debt': ['max', 'mean', 'sum'],
    'amt_credit_sum_overdue': ['max', 'mean', 'sum'],
    'amt_annuity': ['mean'],
    'amt_credit_sum_debt':['mean'],
    'amt_credit_sum_overdue':['mean'],
    # 类别型特征
#     'status_0': ['mean'],
#     'status_1': ['mean'],
#     'status_2': ['mean'],
#     'status_C': ['mean'],
#     'status_X': ['mean'],
#     'credit_active_Active': ['mean'],
#     'credit_active_Closed': ['mean'],
#     'credit_active_Sold': ['mean'],
#     'credit_type_Mortgage': ['mean'],
#     'credit_type_Microloan': ['mean']
}
    #聚合特征
#     df = group(df,prefix='bureau_',aggregations=bureau_agg)

 
    return df
   
    

In [11]:
df = process_bureau_and_balance()

In [18]:
tmp = df.groupBy('sk_id_curr').agg({'days_credit':col})

AssertionError: all exprs should be Column

In [16]:
tmp.columns

['sk_id_curr', 'avg(days_credit)']

### 衍生previous_applicaton表特征

In [14]:
def process_previous_application():
    df = spark.sql('select * from qx_testing.home_credit_previous_application')
    #衍生一些比例特征以及差值特征
    df = df.withColumn('application_credit_diff',df['amt_application'] - df['amt_credit'])
    df = df.withColumn('application_to_credit_ratio',df['amt_application']/df['amt_credit'])
    df = df.withColumn('credit_to_annuity_ratio',df['amt_credit']/df['amt_annuity'])
    
    group_col = ['name_client_type','name_contract_status','name_contract_type','name_cash_loan_purpose','code_reject_reason']
    
    df = cal_mean(df,group_col,'amt_annuity','group_annuity_mean')
    df = cal_std(df,group_col,'amt_annuity','group_annuity_std')
    df = cal_mean(df,group_col,'amt_credit','group_amt_credity_mean')
    df = cal_std(df,group_col,'amt_credit','group_amt_credit_std')
    df = cal_mean(df,group_col,'amt_application','group_amt_application_mean')
    df = cal_std(df,group_col,'amt_application','group_amt_application_std')
    
    #将365243替换为nan
    subset = ['days_first_drawing','days_first_due','days_last_due_1st_version','days_last_due','days_termination']
    df = df.replace(365243,np.nan,subset=subset)
   
    
    return df
    
    
    

### 衍生 pos_cash表特征

In [17]:
def process_pos_cash_balance():
    df = spark.sql('select * from qx_testing.home_credit_pos_cash_balance')
    change_to_binary = udf(lambda col:1 if col >0 else 0 ,returnType=IntegerType())
    df = df.withColumn('late_payment',change_to_binary('sk_dpd'))
    df = df.withColumn('sk_dpd_diff',df['sk_dpd']-df['sk_dpd_def'])
    df = df.withColumn('instalment_diff',df['cnt_instalment'] - df['cnt_instalment_future'])
    
    df = cal_mean(df,['sk_id_curr'],'cnt_instalment','group_cnt_instalment_mean')
    df = cal_std(df,['sk_id_curr'],'cnt_instalment','group_cnt_instalment_std')
    df = cal_mean(df,['sk_id_curr'],'cnt_instalment_future','group_cnt_instalment_future_mean')
    df = cal_std(df,['sk_id_curr'],'cnt_instalment_future','group_cnt_instalment_future_std')
    
    #编码
    return df
  

### 衍生 installments_payment表特征


In [22]:
def process_payment():
    df = spark.sql('select * from qx_testing.home_credit_installments_payments')
    
    df = df.withColumn('days_payment_diff',df['days_instalment']-df['days_entry_payment'])
    
    def change_to_binary(col1,col2):
        if col1 == None and col2==None:
            return None
        elif col1==None:
            return 0
        elif col2 ==None:
            return 1
        elif col1-col2>0:
            return 1
        else: return 0
 
    
    change_to_binary = udf(change_to_binary,IntegerType())
    
    df = df.withColumn('flag_pay_more',change_to_binary('amt_payment','amt_instalment'))

    df = cal_mean(df,['sk_id_curr'],'days_instalment','group_instalment_mean')
    df = cal_std(df,['sk_id_curr'],'days_instalment','group_instalment_std')
    df = cal_mean(df,['sk_id_curr'],'amt_instalment','group_amt_instalment_mean')
    df = cal_std(df,['sk_id_curr'],'amt_instalment','group_amt_instalment_std')
    
    
    return df


    

In [23]:
df = process_payment()

### 衍生 credit_card表特征

In [4]:
def process_credit_card():
    df = spark.sql('select * from qx_testing.home_credit_credit_card_balance')
    df = df.withColumn('limit_use',df['amt_balance']/df['amt_credit_limit_actual'])
    df = df.withColumn('payment_div_min',df['amt_payment_current'] - df['amt_inst_min_regularity'])
    change_to_binary= udf(lambda col:1 if col >0 else 0)
    df = df.withColumn('late_payment',change_to_binary('sk_dpd'))
    
    df= df.withColumn('drawing_limit_ratio',df['amt_drawings_atm_current']/df['amt_credit_limit_actual'])
    
    return df


    

In [8]:
def main():
    with timer("application_train and application_test"):
        df = process_train_test()
        print("Application dataframe shape: ", df.shape)
    with timer("Bureau and bureau_balance data"):
        bureau_df = process_bureau_and_balance()
        df = df.join(bureau_df,on='sk_id_curr',how='left')
        print("Bureau dataframe shape: ", bureau_df.shape)
        del bureau_df; gc.collect()
    with timer("previous_application"):
        prev_df = process_previous_application()
        df = df.join(prev_df,on='sk_id_curr',how='left')
        print("Previous dataframe shape: ", prev_df.shape)
        del prev_df; gc.collect()
    with timer("pos_cash_balance"):
        pos_cash = process_pos_cash_balance()
        df = df.join(pos_cash,on='sk_id_curr',how='left')
        print("POS dataframe shape:",pos_cash.shape)
        del pos_cash;gc.collect()
    with timer("installments_payment"):
        install_pay = process_payment()
        df = df.join(install_pay,on='sk_id_curr',how='left')
        print("Installment_payment shape :",install_pay.shape)
        del install_pay;gc.collect()
    with timer("credit_card "):
        credit_card = process_credit_card()
        df = df.join(credit_card,on='sk_id_curr',how='left')
        print("Credit_card shape:",credit_card.shape)
        del credit_card;gc.collect()
    return  df 
 