In [1]:
import tensorflow as tf
import logging
import argparse
import pyspark.sql.functions as F
from argparse import RawTextHelpFormatter
from time import gmtime, strftime

from pyspark.sql import HiveContext
from pyspark import SparkConf
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.types import DoubleType, IntegerType, BooleanType, StringType, NullType, DateType, TimestampType

description= ""
epilog= ""
APP_NAME= "pyspark_data_clean"

def quiet_py4j():
    """ turn down spark logging for the test context """
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.ERROR)


def spark_context():
    """ fixture for creating a spark context
    Args:
        request: pytest.FixtureRequest object
    """
    # debug use
    conf = (SparkConf().setAppName(APP_NAME))
    sc = SparkContext(conf=conf)
    quiet_py4j()
    return sc

def get_clean_data(df_raw, is_realtime):

    # Process 43 features according to guidelines
    category_feature= [
        "reimbursementId",
        "allocationType",
        "applicantLevel",
        "legalEntityCode",
        "regionCode",
        "appEmployeeName",
        "costCenterCode",
        "organizationCode",
        "vendorCode",
        "expenseId",
        "city",
        "destination",
        "expensePaymentType",
        "flightNo",
        "mileage",
        "placeOfDeparture",
        "transportation",
        "s_reimbursementId",
        "expenseTypeCode",
        "expenseTypeName",
        "s_vendorCode",
        "cid_dbsn",
        "uid"
    ]
    numeric_feature= [
        "cashAdvance",
        "deductAmount",
        "paidByCompany",
        "totalBalanceOfReceivable",
        "totalPayableAmount",
        "totalReimbursementAmount",
        "applyExpenseAmount",
        "expenditure",
        "quantity",
        "unitPrice"
    ]
    if is_realtime:
        bool_feature= [
            "isPurchase",
            "isRoundTrip",
            "isHighRisk"
        ]
    else:
        bool_feature= [
            "isPurchase",
            "isRoundTrip",
            "isHighRisk",
            "sizeOfNcList"
        ]
    text_feature= [
        "comments",
        "deductDemo",
        "subject",
        "expenseDescription",
        "purpose"
    ]
    time_series_feature1= [
        "beginDate",
        "endDate"
    ]
    time_series_feature2= [
        "applyDate",
        "transactionDate"
    ]

    fillempty = F.udf(lambda s: "null" if(s == "") else s, StringType())

    # Process category features according to guidelines
    for col_tmp in category_feature:
        df_raw = df_raw.withColumn(col_tmp, df_raw[col_tmp].cast("string"))
    #    df_raw = df_raw.withColumn(col_tmp, fillempty(df_raw[col_tmp]))

    # Process numeric features according to guidelines
    for col_tmp in numeric_feature:
        df_raw = df_raw.withColumn(col_tmp, df_raw[col_tmp].cast("double"))
    # TODO:01 test 1, test quantity 0 filled with 1
    df_raw = df_raw.withColumn(
        "quantity",
        F.when(df_raw["quantity"]==0, 1).otherwise(df_raw["quantity"])
        )

    # Process bool features according to guidelines
    for col_tmp in bool_feature:
        df_raw = df_raw.withColumn(col_tmp, df_raw[col_tmp].cast("boolean"))

    if not is_realtime:
        df_raw = df_raw.withColumnRenamed("sizeOfNcList","isNC")

    # Process text features according to guidelines
    for col_tmp in text_feature:
        df_raw = df_raw.withColumn(col_tmp, df_raw[col_tmp].cast("string"))
    #    df_raw = df_raw.withColumn(col_tmp, fillempty(df_raw[col_tmp]))


    # Process time series features according to guidelines
    for col_tmp in time_series_feature1:
        df_raw = df_raw.withColumn(
            col_tmp,
            F.to_date(
                F.unix_timestamp(
                    df_raw[col_tmp].cast("string"),"yyyyMMdd"
                    ).cast("timestamp")))

    for col_tmp in time_series_feature2:
        df_raw = df_raw.withColumn(
            col_tmp,
            F.to_date(
                F.from_unixtime(
                    df_raw[col_tmp]/1000,"yyyy-MM-dd")))

    # add year, month, day column in history data
    df_clean_data = df_raw.withColumn(
        "year",
        F.year(df_raw["applyDate"])).withColumn(
            "month",
            F.month(df_raw["applyDate"])).withColumn(
                "day",
                F.dayofmonth(df_raw["applyDate"]))

    columns = [
        "uid",
        "allocationtype",
        "appemployeename",
        "applicantlevel",
        "applydate",
        "applyexpenseamount",
        "begindate",
        "cashadvance",
        "city",
        "comments",
        "costcentercode",
        "deductamount",
        "deductdemo",
        "destination",
        "enddate",
        "expenditure",
        "expensedescription",
        "expenseid",
        "expensepaymenttype",
        "expensetypecode",
        "expensetypename",
        "flightno",
        "ishighrisk",
        "ispurchase",
        "isroundtrip",
        "legalentitycode",
        "mileage",
        "organizationcode",
        "paidbycompany",
        "placeofdeparture",
        "purpose",
        "quantity",
        "regioncode",
        "reimbursementid",
        "s_reimbursementid",
        "s_vendorcode",
        "subject",
        "submitted_date",
        "totalbalanceofreceivable",
        "totalpayableamount",
        "totalreimbursementamount",
        "transactiondate",
        "transportation",
        "unitprice",
        "vendorcode",
        "cid_dbsn"
    ]

    if not is_realtime:
        columns.extend(["isNC","year","month","day"])

    df_clean_data = df_clean_data.select(columns)

    return df_clean_data

# TODO:02 test 2, nc daily count
def calculate_daily_NC(df_clean_data, field_name):
    # count nc for cleaned daily data
    df_NC = df_clean_data.filter(
        df_clean_data["isnc"] == True).groupby(
            df_clean_data["cid_dbsn"],
            df_clean_data[field_name],
            df_clean_data["applydate"].alias("year_month_day")
            ).count().withColumnRenamed("count","nccount")
    return df_NC
        
# TODO:03 test 3, nc count for 3 months
def calculate_recent3m_NC(df_nc, field_name, current_date, duration = 90):

    df_nc = df_nc.withColumn(
        "historydays",
        F.datediff(
            F.lit(current_date),
            df_nc["year_month_day"])
        )
    #df_nc.show()
    df_agg = df_nc.filter(
        df_nc["historydays"] < duration).groupby(
            "cid_dbsn",
            field_name).agg(
                F.sum(
                    df_nc["nccount"]).alias(
                        "recent_total_nccount"))
    return df_agg

# TODO:04 test 4, expense quantity daily count
def calculate_daily_expense_quantity(df_clean_data, field_name, alias_name):

    df_total = df_clean_data.groupby(
        "cid_dbsn",
        "appemployeename",
        df_clean_data["applydate"].alias("year_month_day")
        ).agg(
            F.sum(
                df_clean_data[field_name]).alias(alias_name))

    return df_total

# TODO:05 test 5, expense quantity count for recent 1 month
def calculate_recent1m_expense(df_expense, field_name, alias_name, current_date, duration = 30):

    df_expense = df_expense.withColumn(
        "historydays",
        F.datediff(
            F.lit(current_date),
            df_expense["year_month_day"])
        )
    #df_expense.show()
    df_agg = df_expense.filter(
        df_expense["historydays"] < duration).groupby(
            "cid_dbsn",
            "appemployeename").agg(
                F.sum(
                    df_expense[field_name]).alias(alias_name))

    return df_agg

In [6]:
fn='/Volumes/data/Dropbox/Accenture/projects/iaudit_TnE_from_QianWei/data/sample3.parquet/part-r-00000-2275da01-e816-4f46-889a-fda29405b72d.gz.parquet'
sql_file = '/Volumes/data/Dropbox/Accenture/projects/iaudit_TnE_from_QianWei/source/db/hive.sql'
current_date = strftime("%Y-%m-%d", gmtime())
HistNC_d = 500
HistExp_d = 500

# DONE:20 Read raw data from previous step (ether history feed or hourly feed)
try:
    sc = spark_context()
except:
    print "sc already loaded"

hiveContext = HiveContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df_raw = hiveContext.read.format('parquet').load(fn).dropDuplicates(['uid'])

l = len(df_raw.columns)
print "number of columns = %i" %(l)
print "number of rows = %i" %(df_raw.count())

df_clean_data = get_clean_data(df_raw, 0)

# Todo: Revise to use op.db_db

#query_db = "use {}_db".format(op.db)
#hiveContext.sql(query_db)

sc already loaded
number of columns = 47
number of rows = 1000


In [7]:
df_raw_collect = df_raw.collect()

In [11]:
print df_raw_collect[1]

Row(uid=u'2_4|10415|84364', allocationType=u'\u62a5\u9500\u5355\u5206\u644a', appEmployeeName=u'davidysun(\u5b59\u745c)', applicantLevel=u'2', applyDate=1463068800000, applyExpenseAmount=500.0, beginDate=20160310, cashAdvance=0.0, city=u'\u5317\u4eac/\u4e0a\u6d77', comments=u'2016\u5e743\u670810\u65e5-11\u65e5\uff0c\u51fa\u5dee\u5317\u4eac\uff0c\u62dc\u8bbf\u6dd8\u5b9d\u3001\u5b66\u9738\u541b\u5ba2\u6237\u3002', costCenterCode=u'0104A', deductAmount=0.0, deductDemo=u'', destination=u'', endDate=20160311, expenditure=0.0, expenseDescription=u'', expenseId=u'84364', expensePaymentType=u'', expenseTypeCode=u'', expenseTypeName=u'\u5dee\u65c5\u8d39_\u56fd\u5185\u5dee\u65c5\u8d39_\u4f4f\u5bbf\u8d39', flightNo=u'', isHighRisk=0, isPurchase=False, isRoundTrip=0, legalEntityCode=u'T01', mileage=u'', organizationCode=u'0104A', paidByCompany=0.0, placeOfDeparture=u'', purpose=u'', quantity=0.0, regionCode=u'', reimbursementId=u'10415', s_reimbursementId=u'10415', s_vendorCode=u'null', subject=u'

In [12]:
try:
    hiveContext.sql('use db_dropbox')
except:
    with open(sql_file) as sql_init:
        sql_init_str = sql_init.read()
        sql_query = sql_init_str.replace('\r\n', '').split(';')
        for item in sql_query:
            try:
                hiveContext.sql(item)
            except:
                print("SQL error!")
    # DONE:0 Save processed data into hive table history_data
    df_clean_data.write.insertInto("history_record",overwrite=False)

    #df_clean_data.write.partitionBy('year','month','day').insertInto("history_record",overwrite=False)

    print "History/daily process >> data clean: {}".format("done!")


    hiveContext.sql('show tables').show()

+----------+--------------------+-----------+
|  database|           tableName|isTemporary|
+----------+--------------------+-----------+
|db_dropbox| costcenter_daily_nc|      false|
|db_dropbox|costcenter_histor...|      false|
|db_dropbox|employee_daily_ex...|      false|
|db_dropbox|   employee_daily_nc|      false|
|db_dropbox|employee_daily_qu...|      false|
|db_dropbox|employee_history_...|      false|
|db_dropbox| employee_history_nc|      false|
|db_dropbox|employee_history_...|      false|
|db_dropbox|             feature|      false|
|db_dropbox|  feature_importance|      false|
|db_dropbox|      history_record|      false|
|db_dropbox|legalentity_daily_nc|      false|
|db_dropbox|legalentity_histo...|      false|
|db_dropbox|organization_dail...|      false|
|db_dropbox|organization_hist...|      false|
|db_dropbox|              output|      false|
|db_dropbox|   output_evaluation|      false|
|db_dropbox|  prediction_history|      false|
+----------+--------------------+-

In [16]:
df_clean_data.write.insertInto("history_record",overwrite=False)
data_history_record = hiveContext.sql('select * from history_record').collect()

In [24]:
x = df_clean_data.toPandas()

                  uid allocationtype   appemployeename applicantlevel  \
0     2_4|10415|84363          报销单分摊     davidysun(孙瑜)              2   
1     2_4|10415|84364          报销单分摊     davidysun(孙瑜)              2   
2      2_4|6184|45118          报销单分摊        jianhe(何剑)              2   
3      2_4|9163|73880          报销单分摊     byronwang(王刚)              2   
4       2_4|1964|9362          报销单分摊      beatafu(付红艳)              2   
5      2_4|2338|12389          报销单分摊     tedgzcao(曹冠中)              2   
6    2_4|24488|203587          报销单分摊         haoma(马好)              2   
7      2_4|2482|13728          报销单分摊    kevinxhxu(许辉旭)              2   
8      2_4|2643|14977          报销单分摊     teemoliu(刘沛城)              2   
9      2_4|3988|26176          报销单分摊     loiszhang(张璐)              2   
10     2_4|3989|26191          报销单分摊     loiszhang(张璐)              2   
11     2_4|6184|45123          报销单分摊        jianhe(何剑)              2   
12     2_4|6185|45148          报销单分摊        jianhe(

In [26]:
x.to_csv('/Users/huaijianzhang/nlp/iaudit_1000_dataset.csv',encoding='utf-8')

In [11]:
for id, table_name in (
        ("costcentercode","costcenter_daily_nc"),
        ("legalentitycode","legalentity_daily_nc"),
        ("organizationcode","organization_daily_nc"),
        ("appemployeename","employee_daily_nc")
    ):
    df_daily_nc = calculate_daily_NC(df_clean_data, id)
    # append to daily table
    df_daily_nc.write.insertInto(table_name, overwrite= False)

for table_name, field_name, alias_name in (
        ("employee_daily_expense", "applyexpenseamount", "totalexpense"),
        ("employee_daily_quantity", "quantity", "totalquantity")
    ):
    df_daily_expense_quantity = calculate_daily_expense_quantity(df_clean_data, field_name, alias_name)
    # append to daily table
    df_daily_expense_quantity.write.insertInto(table_name, overwrite = False)

print "calculate daily NC and expense >> {}".format("done!")

for id, daily_table_name, history_table_name in (
        ("costcentercode","costcenter_daily_nc","costcenter_history_nc"),
        ("legalentitycode","legalentity_daily_nc","legalentity_history_nc"),
        ("organizationcode","organization_daily_nc","organization_history_nc"),
        ("appemployeename","employee_daily_nc","employee_history_nc")
    ):
    df_nc = hiveContext.table(daily_table_name)
    df_recent_NC = calculate_recent3m_NC(df_nc, id , current_date, HistNC_d)
    df_recent_NC.write.insertInto(history_table_name, overwrite = True)

for field_name, alias_name, daily_table_name, history_table_name in (
        ("totalexpense","recent_total_expense","employee_daily_expense","employee_history_expense"),
        ("totalquantity","recent_total_quantity","employee_daily_quantity","employee_history_quantity")
    ):
    df_expense = hiveContext.table(daily_table_name)
    df_recent_expense = calculate_recent1m_expense(df_expense, field_name, alias_name, current_date, HistExp_d)
    df_recent_expense.write.insertInto(history_table_name, overwrite = True)

print "calculate recent NC and expense history >> {}".format("done!")

calculate daily NC and expense >> done!
calculate recent NC and expense history >> done!


In [13]:
import logging
import argparse
import pyspark.sql.functions as F
from argparse import RawTextHelpFormatter
from time import gmtime, strftime

from pyspark import HiveContext
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.streaming import StreamingContext
from pyspark.sql.types import DoubleType, IntegerType, BooleanType, StringType, NullType, DateType, TimestampType


description= ""
epilog= ""
APP_NAME= "pyspark_features"

def quiet_py4j():
    """ turn down spark logging for the test context """
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.ERROR)

def spark_context():
    """ fixture for creating a spark context
    Args:
        request: pytest.FixtureRequest object
    """
    conf = (SparkConf().setAppName(APP_NAME))
    sc = SparkContext(conf=conf)
    quiet_py4j()
    return sc

def normalize_features(df_feature):
    numeric_feature= [
        "cashAdvance",
        "deductAmount",
        #"paidByCompany",
        #"totalBalanceOfReceivable",
        "totalPayableAmount",
        "totalReimbursementAmount",
        "applyExpenseAmount",
        #"expenditure",
        #"quantity",
        #"unitPrice",
        #"duration",
        #"totalexpensequantity",
        #"costcenterhistorync",
        #"legalentityhistorync",
        #"organizationhistorync",
        #"employeehistorync",
        "employeerecentexpense"
        #"employeerecentexpensequantity"
    ]
    for col in numeric_feature:
        df_feature = df_feature.withColumn(col, F.log1p(df_feature[col]))

    return df_feature

In [14]:
def get_record_features(df_clean_data):
    # TODO:01 test 1, duration calculation
    # TODO:02 test 2, totalquantity calculation

    df_clean_data= df_clean_data.withColumn(
        "duration",
        F.datediff(
            df_clean_data["enddate"],
            df_clean_data["begindate"]) + 1)

    df_totalQuantity = df_clean_data.groupby(
        df_clean_data["reimbursementid"], df_clean_data["cid_dbsn"]).agg(
            F.sum(df_clean_data["quantity"]).alias(
                "totalexpensequantity"))

    df_feature_data = df_clean_data.join(
        df_totalQuantity,
        ["reimbursementid", "cid_dbsn"],
        "left_outer")

    return df_feature_data

In [15]:
def get_features(df_clean_data, hiveContext, is_realtime):
    # TODO:03 test 3, test data correctly referenced: costcenterhistorync, legalentityhistorync, organizationhistorync, employeehistorync, employeerecentexpense, employeerecentexpensequantity

    # add detail and master record features
    df_feature_data = get_record_features(df_clean_data)

    # read hive tables of aggregated nc count/expense
    # TODO: remove use tne_db
    costcenter_agg_nc = hiveContext.table(
        "costcenter_history_nc").withColumnRenamed(
            "recent_total_nccount",
            "costcenterhistorync")

    legalentity_agg_nc = hiveContext.table(
        "legalentity_history_nc").withColumnRenamed(
            "recent_total_nccount",
            "legalentityhistorync")

    organization_agg_nc = hiveContext.table(
        "organization_history_nc").withColumnRenamed(
            "recent_total_nccount",
            "organizationhistorync")

    employee_agg_nc = hiveContext.table(
        "employee_history_nc").withColumnRenamed(
            "recent_total_nccount",
            "employeehistorync")

    employee_agg_expense = hiveContext.table(
        "employee_history_expense").withColumnRenamed(
            "recent_total_expense",
            "employeerecentexpense")

    employee_agg_quantity = hiveContext.table(
        "employee_history_quantity").withColumnRenamed(
            "recent_total_quantity",
            "employeerecentexpensequantity")

    # join 4 nc history features and 2 employee expense features
    df_feature_data = df_feature_data.join(
        costcenter_agg_nc,
        ["cid_dbsn","costcentercode"],
        "left_outer").join(
            legalentity_agg_nc,
            ["cid_dbsn","legalentitycode"],
            "left_outer").join(
                organization_agg_nc,
                ["cid_dbsn","organizationcode"],
                "left_outer" ).join(
                    employee_agg_nc,
                    ["cid_dbsn","appemployeename"],
                    "left_outer").join(
                        employee_agg_expense,
                        ["cid_dbsn","appemployeename"],
                        "left_outer").join(
                            employee_agg_quantity,
                            ["cid_dbsn","appemployeename"],
                            "left_outer")

    if is_realtime:
        df_feature_data= df_feature_data.withColumn(
            "isnc",
            F.isnull(df_feature_data["uid"]))

    df_features = df_feature_data.fillna(
            {'duration': 1,
             'totalexpensequantity': 1,
             'costcenterhistorync': 0,
             'legalentityhistorync': 0,
             'organizationhistorync': 0,
             'employeehistorync': 0,
             'employeerecentexpense': 0.0,
             'employeerecentexpensequantity': 0.0,
             # category
             'uid':"null",
             'reimbursementId':"null",
             'allocationType':"null",
             'applicantLevel':"null",
             'legalEntityCode':"null",
             'regionCode':"null",
             'appEmployeeName':"null",
             'costCenterCode':"null",
             'organizationCode':"null",
             'vendorCode':"null",
             'expenseId':"null",
             'city':"null",
             'destination':"null",
             'expensePaymentType':"null",
             'flightNo':"null",
             'mileage':"null",
             'placeOfDeparture':"null",
             'transportation':"null",
             'expenseTypeCode':"null",
             'expenseTypeName':"null",
             'cid_dbsn':"null",
             # text
             'comments':"null",
             'deductDemo':"null",
             'subject':"null",
             'expenseDescription':"null",
             'purpose':"null",
             # numeric
             'cashAdvance':0.0,
             'deductAmount':0.0,
             'paidByCompany':0.0,
             'totalBalanceOfReceivable':0.0,
             'totalPayableAmount':0.0,
             'totalReimbursementAmount':0.0,
             'applyExpenseAmount':0.0,
             'expenditure':0.0,
             'quantity':1.0,
             'unitPrice':0.0,
             # bool
             'isPurchase':False,
             'isRoundTrip':False,
             'isHighRisk':False,
             'isnc': False})

    df_features = normalize_features(df_features)

    columns=[
        "uid",
        "allocationtype",
        "appemployeename",
        "applicantlevel",
        "applyexpenseamount",
        "begindate",
        "cashadvance",
        "city",
        "comments",
        "costcentercode",
        "deductamount",
        "deductdemo",
        "destination",
        "enddate",
        "expenditure",
        "expensedescription",
        "expenseid",
        "expensepaymenttype",
        "expensetypecode",
        "expensetypename",
        "flightno",
        "ispurchase",
        "isroundtrip",
        "legalentitycode",
        "mileage",
        "organizationcode",
        "paidbycompany",
        "placeofdeparture",
        "purpose",
        "quantity",
        "regioncode",
        "reimbursementid",
        "subject",
        "submitted_date",
        "totalbalanceofreceivable",
        "totalpayableamount",
        "totalreimbursementamount",
        "transportation",
        "unitprice",
        "vendorcode",
        "cid_dbsn",
        "duration",
        "totalexpensequantity",
        "costcenterhistorync",
        "legalentityhistorync",
        "organizationhistorync",
        "employeehistorync",
        "employeerecentexpense",
        "employeerecentexpensequantity",
        "isnc"
    ]

    return df_features.select(columns)

In [16]:
df_history_data = hiveContext.table("history_record")

In [17]:
Hist_d=500
df_clean_data = df_history_data.withColumn(
        "historydays",
        F.datediff(
            F.lit(current_date),
            F.date_format(df_history_data["applyDate"],"yyyy-MM-dd"))
)
# select recent 3 month history data to do feature engineering
# df_recent3m_history = df_clean_data.filter(df_clean_data["historydays"] < Hist_d).dropDuplicates(['uid'])

In [18]:
df_features = get_features(df_clean_data, hiveContext, 0)

In [19]:
len(df_features.columns)

50

In [20]:
df_features.write.insertInto("feature",overwrite=True)

In [160]:
import logging
import argparse
from argparse import RawTextHelpFormatter
import re

from pyspark import HiveContext
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.streaming import StreamingContext
from pyspark.sql.types import DoubleType, IntegerType, BooleanType, StringType, NullType, DateType, TimestampType
import h2o
from pysparkling import *
from h2o.estimators.deeplearning import H2OAutoEncoderEstimator

In [22]:
description= ""
epilog= ""
APP_NAME= "pyspark_model"
MODEL_PATH= "/Volumes/data/Dropbox/Accenture/TnE_from_QianWei/source/analysis/model_save/"

In [23]:
def quiet_py4j():
    """ turn down spark logging for the test context """
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.ERROR)

def spark_context():
    """ fixture for creating a spark context
    Args:
        request: pytest.FixtureRequest object
    """
    conf = (SparkConf().setAppName(APP_NAME))
    sc = SparkContext(conf=conf)
    quiet_py4j()
    return sc

def decode_string(x):
    x= unicode(x)
    for pair in re.findall(r'(\<0x(\w+)\>)',x):
        if pair == []:
            return x
        else:
            full, part = pair
            x = x.replace(full,part.decode("hex").decode("utf8"))

    return x

def set_model_path(db):
    global MODEL_PATH
    MODEL_PATH = MODEL_PATH + db + "/"

In [24]:
def train_n_prediction(df_feature, df_feature_h2o, sc, hiveContext, hc):

    x= [#"allocationtype",
        "applicantlevel",
        #"legalentitycode", "regioncode",
        "appemployeename", #"costcentercode",
        "organizationcode",# "vendorcode",
        #"city", "destination", "expensepaymenttype", "flightno",
        #"mileage", "placeofdeparture", "transportation",
        "expensetypename",
        "cid_dbsn", "duration", #"costcenterhistorync", "legalentityhistorync",
        "organizationhistorync",
        "employeehistorync", "cashadvance", "deductamount",
        #"paidbycompany", "totalbalanceofreceivable", "totalpayableamount",
        "totalreimbursementamount",
        "applyexpenseamount",
        #"expenditure", "quantity", "unitprice",
        #"totalexpensequantity",
        "employeerecentexpense"
        #"employeerecentexpensequantity",
        #"ispurchase", "isroundtrip",
        #"date_span"
    ]

    dl_model = H2OAutoEncoderEstimator(hidden = [100, 80, 50, 80, 100],
                                    activation = "Tanh",
                                    variable_importances =True,
                                    epochs = 100,
                                    seed = 123
                                    )

    dl_model.train(
            x               = x,
            training_frame  = df_feature_h2o,
    )

    # Save model into "autoencoder.model"
    print MODEL_PATH
    model_path = h2o.save_model(dl_model, path=MODEL_PATH, force=True)
    sc.parallelize([model_path]).saveAsPickleFile(MODEL_PATH+'model_path')
    print "Daily process >> building DL model: {}".format("done!")

    df_mse_h2o = dl_model.anomaly(df_feature_h2o, per_feature=False)
    df_cbind = df_feature_h2o["uid"].cbind(df_mse_h2o)
    df_tmp = hc.as_spark_frame(df_cbind)
    print "expenseid_mse row numbers = {}".format(df_tmp.count())

    df_prediction = df_feature.join(df_tmp, on= "uid", how= "left_outer")
    df_prediction = df_prediction.withColumnRenamed("Reconstruction.MSE","riskscore")

    # TODO:110 Save combined data into "prediction_history"
    df_prediction = hiveContext.createDataFrame(df_prediction.rdd, df_prediction.schema)
    columns = [
        "uid",
        "allocationtype",
        "appemployeename",
        "applicantlevel",
        "applyexpenseamount",
        "begindate",
        "cashadvance",
        "city",
        "costcentercode",
        "deductamount",
        "destination",
        "enddate",
        "expenditure",
        "expenseid",
        "expensepaymenttype",
        "expensetypename",
        "flightno",
        "ispurchase",
        "isroundtrip",
        "legalentitycode",
        "mileage",
        "organizationcode",
        "paidbycompany",
        "placeofdeparture",
        "quantity",
        "regioncode",
        "reimbursementid",
        "submitted_date",
        "totalbalanceofreceivable",
        "totalpayableamount",
        "totalreimbursementamount",
        "transportation",
        "unitprice",
        "vendorcode",
        "duration",
        # "totalexpensequantity",
        "costcenterhistorync",
        "legalentityhistorync",
        "organizationhistorync",
        "employeehistorync",
        "employeerecentexpense",
        "employeerecentexpensequantity",
        "cid_dbsn",
        "riskscore",
        "isnc"
    ]

    # DONE:100 Save feature importance into table "feature_importance"
    df_feature_imp= hiveContext.createDataFrame(dl_model.varimp(use_pandas = True)) # as_spark_dataframe
    print "df_prediction row numbers = {}".format(df_prediction.count())

    return df_prediction.select(columns), df_feature_imp, dl_model

In [25]:
def get_h2o_features(df_feature, hc):
    df_feature_h2o = hc.as_h2o_frame(df_feature.drop("begindate").drop("enddate"),"featureTable")
    return df_feature_h2o

In [40]:
def get_prediction(df_feature, dl_model, df_feature_h2o, sc, hiveContext, hc, is_realtime):
    #model_path = sc.pickleFile(MODEL_PATH+'model_path').collect()[0]
    #print model_path
    #dl_model = h2o.load_model(model_path)

    # DONE:90 Combine MSE with features as_data_frame(True)
    df_mse_h2o = dl_model.anomaly(df_feature_h2o, per_feature=False)
    df_cbind = df_feature_h2o["uid"].cbind(df_mse_h2o)
    df_tmp = hc.as_spark_frame(df_cbind)
    print "expenseid_mse row numbers = {}".format(df_tmp.count())

    df_prediction = df_feature.join(df_tmp, on= "uid", how= "left_outer")
    df_prediction = df_prediction.withColumnRenamed("Reconstruction.MSE","riskscore")

    # transform data type
    df_prediction = df_prediction.withColumn("begindate",F.to_date(df_prediction["begindate"])) \
        .withColumn("enddate",F.to_date(df_prediction["enddate"]))

    ## Features
    int_variables = [
        "duration",
        "costcenterhistorync",
        "legalentityhistorync",
        "organizationhistorync",
        "employeehistorync"
    ]

    double_variables = [
        "cashadvance",
        "deductamount",
        "paidbycompany",
        "totalbalanceofreceivable",
        "totalpayableamount",
        "totalreimbursementamount",
        "applyexpenseamount",
        "expenditure",
        "quantity",
        "unitprice",
        "employeerecentexpense",
        "employeerecentexpensequantity"
    ]

    bool_variables = [
        "ispurchase",
        "isroundtrip"
    ]

    for tmp_col in int_variables:
        df_prediction = df_prediction.withColumn(tmp_col, df_prediction[tmp_col].cast("int"))

    for tmp_col in double_variables:
        df_prediction = df_prediction.withColumn(tmp_col, df_prediction[tmp_col].cast("double"))

    for tmp_col in bool_variables:
        df_prediction = df_prediction.withColumn(tmp_col, df_prediction[tmp_col].cast("boolean"))

    # TODO:110 Save combined data into "prediction_history"
    df_prediction = df_prediction.sort(df_prediction["riskscore"].desc())
    df_prediction = hiveContext.createDataFrame(df_prediction.rdd,df_prediction.schema)
    columns = [
        "uid",
        "allocationtype",
        "appemployeename",
        "applicantlevel",
        "applyexpenseamount",
        "begindate",
        "cashadvance",
        "city",
        "costcentercode",
        "deductamount",
        "destination",
        "enddate",
        "expenditure",
        "expenseid",
        "expensepaymenttype",
        "expensetypename",
        "flightno",
        "ispurchase",
        "isroundtrip",
        "legalentitycode",
        "mileage",
        "organizationcode",
        "paidbycompany",
        "placeofdeparture",
        "quantity",
        "regioncode",
        "reimbursementid",
        "submitted_date",
        "totalbalanceofreceivable",
        "totalpayableamount",
        "totalreimbursementamount",
        "transportation",
        "unitprice",
        "vendorcode",
        "duration",
        "costcenterhistorync",
        "legalentityhistorync",
        "organizationhistorync",
        "employeehistorync",
        "employeerecentexpense",
        "employeerecentexpensequantity",
        "cid_dbsn",
        "riskscore",
        "isnc"
    ]


    # DONE:100 Save feature importance into table "feature_importance"
    df_feature_imp= hiveContext.createDataFrame(dl_model.varimp(use_pandas = True)) # as_spark_dataframe
    decode = F.udf(decode_string, StringType())
    df_feature_imp = df_feature_imp.withColumn("variable",decode(df_feature_imp["variable"]))
    df_feature_imp.show()

    return (df_prediction.select(columns),df_feature_imp)

In [27]:
df_feature  = hiveContext.table("feature")

In [28]:
df_feature.count()

1000

In [29]:
hc = H2OContext.getOrCreate(sc)

  "parameter of type SparkSession is preferred.")


Connecting to H2O server at http://10.202.27.15:54323... successful.


0,1
H2O cluster uptime:,12 secs
H2O cluster version:,3.10.4.8
H2O cluster version age:,22 days
H2O cluster name:,sparkling-water-huaijianzhang_local-1497244438411
H2O cluster total nodes:,1
H2O cluster free memory:,910 Mb
H2O cluster total cores:,4
H2O cluster allowed cores:,4
H2O cluster status:,"accepting new members, healthy"
H2O connection url:,http://10.202.27.15:54323


In [30]:
hive_df_feature  = hiveContext.table("feature")

In [31]:
l = hive_df_feature.select("uid").distinct().count()
print "unique uid number = {}".format(l)

unique uid number = 1000


In [32]:
df_feature_h2o = get_h2o_features(hive_df_feature, hc)

In [33]:
df_feature_h2o.dim

[1000, 48]

In [34]:
df_feature_h2o.show()

uid,allocationtype,appemployeename,applicantlevel,applyexpenseamount,cashadvance,city,comments,costcentercode,deductamount,deductdemo,destination,expenditure,expensedescription,expenseId,expensepaymenttype,expensetypecode,expensetypename,flightno,ispurchase,isroundtrip,legalentitycode,mileage,organizationcode,paidbycompany,placeofdeparture,purpose,quantity,regioncode,reimbursementId,subject,submitted_date,totalbalanceofreceivable,totalpayableamount,totalreimbursementamount,transportation,unitprice,vendorcode,cid_dbsn,duration,totalExpenseQuantity,costCenterHistoryNC,legalEntityHistoryNC,organizationHistoryNC,employeeHistoryNC,employeeRecentExpense,employeeRecentExpenseQuantity,isnc
2_4|12670|103706,报销单分摊,disenzhang(张东轩),2,3.04452,0,,2016年1月26日-2016年3月25日，交通费报销,21003,0,,,0,,103706,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21003,0,,,1,,12670,,20161019_12,0,4.82831,4.82831,,0,,2_4,1,6,0,46,0,0,4.82831,6,0
2_4|12670|103705,报销单分摊,disenzhang(张东轩),2,3.21888,0,,2016年1月26日-2016年3月25日，交通费报销,21003,0,,,0,,103705,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21003,0,,,1,,12670,,20161019_12,0,4.82831,4.82831,,0,,2_4,1,6,0,46,0,0,4.82831,6,0
2_4|12670|103708,报销单分摊,disenzhang(张东轩),2,3.04452,0,,2016年1月26日-2016年3月25日，交通费报销,21003,0,,,0,,103708,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21003,0,,,1,,12670,,20161019_12,0,4.82831,4.82831,,0,,2_4,1,6,0,46,0,0,4.82831,6,0
2_4|12670|103704,报销单分摊,disenzhang(张东轩),2,2.99573,0,,2016年1月26日-2016年3月25日，交通费报销,21003,0,,,0,,103704,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21003,0,,,1,,12670,,20161019_12,0,4.82831,4.82831,,0,,2_4,1,6,0,46,0,0,4.82831,6,0
2_4|12670|103707,报销单分摊,disenzhang(张东轩),2,3.09104,0,,2016年1月26日-2016年3月25日，交通费报销,21003,0,,,0,,103707,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21003,0,,,1,,12670,,20161019_12,0,4.82831,4.82831,,0,,2_4,1,6,0,46,0,0,4.82831,6,0
2_4|12670|103709,报销单分摊,disenzhang(张东轩),2,3.04452,0,,2016年1月26日-2016年3月25日，交通费报销,21003,0,,,0,,103709,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21003,0,,,1,,12670,,20161019_12,0,4.82831,4.82831,,0,,2_4,1,6,0,46,0,0,4.82831,6,0
2_4|2482|13701,报销单分摊,kevinxhxu(许辉旭),2,3.3673,0,,2015.12-2016.04加班打车报销,21106,0,,,0,,13701,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21106,0,,,1,,2482,,20161019_12,0,7.47647,7.47647,,0,,2_4,1,46,0,46,0,0,7.47647,46,0
2_4|2482|13712,报销单分摊,kevinxhxu(许辉旭),2,3.3673,0,,2015.12-2016.04加班打车报销,21106,0,,,0,,13712,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21106,0,,,1,,2482,,20161019_12,0,7.47647,7.47647,,0,,2_4,1,46,0,46,0,0,7.47647,46,0
2_4|2482|13702,报销单分摊,kevinxhxu(许辉旭),2,2.77259,0,,2015.12-2016.04加班打车报销,21106,0,,,0,,13702,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21106,0,,,1,,2482,,20161019_12,0,7.47647,7.47647,,0,,2_4,1,46,0,46,0,0,7.47647,46,0
2_4|2482|13729,报销单分摊,kevinxhxu(许辉旭),2,4.02535,0,,2015.12-2016.04加班打车报销,21106,0,,,0,,13729,,,市内交通费_夜间及其他市内交通费_夜间交通费,,0,0,T01,,21106,0,,,1,,2482,,20161019_12,0,7.47647,7.47647,,0,,2_4,1,46,0,46,0,0,7.47647,46,0


In [35]:
category_variables = [
        "allocationtype",
        "applicantlevel",
        "legalentitycode",
        "regioncode",
        "appemployeename",
#        "costcentercode",
        "organizationcode",
        "vendorcode",
        "city",
        "destination",
        "expensepaymenttype",
        "flightno",
        "mileage",
        "placeofdeparture",
        "transportation",
        "expensetypename",
        "cid_dbsn"
    ]

    int_variables = [
        "duration",
#        "costcenterhistorync",
#        "legalentityhistorync",
#        "organizationhistorync",
#        "employeehistorync"
    ]

    double_variables = [
        "cashadvance",
        "deductamount",
        "paidbycompany",
        "totalbalanceofreceivable",
        "totalpayableamount",
        "totalreimbursementamount",
        "applyexpenseamount",
        "expenditure",
        "quantity",
        "unitprice",
#        "totalexpensequantity",
#        "employeerecentexpense",
#        "employeerecentexpensequantity"
    ]

    bool_variables = [
        "ispurchase",
        "isroundtrip"
    ]

    date_variables = [
        "begindate",
        "enddate"
    ]

    x = []
    x.extend(category_variables)
    x.extend(int_variables)
    x.extend(double_variables)
    x.extend(bool_variables)

In [271]:
# autoencoder
dl_model = H2OAutoEncoderEstimator(hidden = [200, 100, 50, 100, 200],
                                activation = "Tanh",
                                variable_importances =True,
                                epochs = 10,
                                seed = 123
                                )
dl_model.train(
        x               = x,
        training_frame  = df_feature_h2o,
)

df_prediction, df_feature_imp = get_prediction(df_feature, dl_model, df_feature_h2o, sc, hiveContext, hc, 0)
#print df_feature_h2o.describe()
df_prediction, df_feature_imp = get_prediction(df_feature, dl_model, df_feature_h2o, sc, hiveContext, hc, 0)

print "df_prediction row numbers = {}".format(df_prediction.count())

df_prediction.write.insertInto("prediction_history", overwrite= True)
df_feature_imp.write.insertInto("feature_importance", overwrite = True)

print "Daily process >> save important features: {}".format("done!")

deeplearning Model Build progress: |██████████████████████████████████████| 100%
expenseid_mse row numbers = 1000
+--------------------+-------------------+------------------+-------------------+
|            variable|relative_importance| scaled_importance|         percentage|
+--------------------+-------------------+------------------+-------------------+
|        deductamount|                1.0|               1.0|0.22254268262497925|
|  applyexpenseamount| 0.9618339538574219|0.9618339538574219|0.21404910833122118|
|            duration| 0.9356208443641663|0.9356208443641663|0.20821557262464976|
|totalreimbursemen...| 0.8018185496330261|0.8018185496330261| 0.1784388510138037|
|  totalpayableamount| 0.7942466735839844|0.7942466735839844|0.17675378540534611|
+--------------------+-------------------+------------------+-------------------+



In [252]:
from pyspark.sql.functions import rand,when
df_feature_label = df_feature.withColumn('label', when(rand() > 0.7, 1).otherwise(0))
df_feature_label_h2o = hc.as_h2o_frame(df_feature_label)
y = "label"

In [268]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from h2o.estimators.deeplearning import H2OAutoEncoderEstimator, H2ODeepLearningEstimator
from h2o.estimators.gbm import H2OGradientBoostingEstimator
from h2o.estimators.glm import H2OGeneralizedLinearEstimator
from h2o.estimators.random_forest import H2ORandomForestEstimator

dl_model2 = H2ODeepLearningEstimator(hidden = [200, 100, 50, 100, 200],
                                activation = "Tanh",
                                variable_importances = True,
                                epochs = 10,
                                seed = 123)
dl_model2.train(
        x               = x,
        y               = y,
        training_frame  = df_feature_label_h2o,
)
predicted = dl_model2.predict(test_data=df_feature_label_h2o)

deeplearning Model Build progress: |██████████████████████████████████████| 100%
deeplearning prediction progress: |███████████████████████████████████████| 100%


In [270]:
gb_model = H2OGradientBoostingEstimator()
gb_model.train(
        x               = x,
        y               = y,
        training_frame  = df_feature_label_h2o,
)
predicted = gb_model.predict(test_data=df_feature_label_h2o)

gbm Model Build progress: |███████████████████████████████████████████████| 100%
gbm prediction progress: |████████████████████████████████████████████████| 100%


[1000, 1]

expenseid_mse row numbers = 1000
+--------------------+-------------------+------------------+-------------------+
|            variable|relative_importance| scaled_importance|         percentage|
+--------------------+-------------------+------------------+-------------------+
|        deductamount|                1.0|               1.0|0.21874546106078435|
|  applyexpenseamount| 0.9875599145889282|0.9875599145889282| 0.2160242488419039|
|            duration|  0.949902355670929| 0.949902355670929| 0.2077868287539625|
|  totalpayableamount| 0.8233636617660522|0.8233636617660522| 0.1801070638137108|
|totalreimbursemen...| 0.8106974959373474|0.8106974959373474| 0.1773363975296384|
+--------------------+-------------------+------------------+-------------------+



expenseid_mse row numbers = 1000


AnalysisException: u'Cannot resolve column name "_1" among (variable, relative_importance, scaled_importance, percentage);'

In [40]:
import tensorflow as tf
from tensorflow.python.framework import dtypes

In [38]:
df_feature.take(1)

[Row(uid=u'2_4|12670|103706', allocationtype=u'\u62a5\u9500\u5355\u5206\u644a', appemployeename=u'disenzhang(\u5f20\u4e1c\u8f69)', applicantlevel=u'2', applyexpenseamount=3.044522437723423, begindate=datetime.date(2016, 3, 7), cashadvance=0.0, city=u'', comments=u'2016\u5e741\u670826\u65e5-2016\u5e743\u670825\u65e5\uff0c\u4ea4\u901a\u8d39\u62a5\u9500', costcentercode=u'21003', deductamount=0.0, deductdemo=u'null', destination=u'', enddate=datetime.date(2016, 3, 7), expenditure=0.0, expensedescription=u'', expenseId=u'103706', expensepaymenttype=u'', expensetypecode=u'', expensetypename=u'\u5e02\u5185\u4ea4\u901a\u8d39_\u591c\u95f4\u53ca\u5176\u4ed6\u5e02\u5185\u4ea4\u901a\u8d39_\u591c\u95f4\u4ea4\u901a\u8d39', flightno=u'', ispurchase=False, isroundtrip=False, legalentitycode=u'T01', mileage=u'', organizationcode=u'21003', paidbycompany=0.0, placeofdeparture=u'', purpose=u'', quantity=1.0, regioncode=u'', reimbursementId=u'12670', subject=u'', submitted_date=u'20161019_12', totalbalanc

In [144]:
def encoder(x):
    # Encoder Hidden layer with sigmoid activation #1
    layer_1 = tf.nn.sigmoid(tf.add(tf.matmul(x, weights['encoder_h1']),
                                   biases['encoder_b1']))
    # Decoder Hidden layer with sigmoid activation #2
    layer_2 = tf.nn.sigmoid(tf.add(tf.matmul(layer_1, weights['encoder_h2']),
                                   biases['encoder_b2']))

    layer_3 = tf.nn.sigmoid(tf.add(tf.matmul(layer_2, weights['encoder_h3']),
                                   biases['encoder_b3']))
    return layer_3

# Building the decoder
def decoder(x):
    # Encoder Hidden layer with sigmoid activation #0
    layer_0 = tf.nn.sigmoid(tf.add(tf.matmul(x, weights['decoder_h0']),
                                   biases['decoder_b0']))
    # Encoder Hidden layer with sigmoid activation #1
    layer_1 = tf.nn.sigmoid(tf.add(tf.matmul(layer_0, weights['decoder_h1']),
                                   biases['decoder_b1']))
    # Decoder Hidden layer with sigmoid activation #2
    layer_2 = tf.nn.sigmoid(tf.add(tf.matmul(layer_1, weights['decoder_h2']),
                                   biases['decoder_b2']))
    return layer_2

In [145]:
# Construct model
encoder_op = encoder(X)
decoder_op = decoder(encoder_op)

# Prediction
y_pred = decoder_op

# Targets (Labels) are the input data.
y_true = X

In [150]:
if cost == "mse":
    cost = tf.reduce_mean(tf.pow(y_true - y_pred, 2))
optimizer = tf.train.RMSPropOptimizer(learning_rate).minimize(cost)

# Initializing the variables
init = tf.global_variables_initializer()

In [154]:
sess = tf.Session()
sess.run(init)

In [155]:
_, c = sess.run([optimizer, cost], feed_dict={X: input_df_feature})

In [156]:
type(df_feature)

pyspark.sql.dataframe.DataFrame

In [74]:
import pandas as pd

In [115]:
pandas_df_feature = df_feature.toPandas()

uid is not float
allocationtype is not float
appemployeename is not float
+ applicantlevel is included
+ applyexpenseamount is included
begindate is not float
+ cashadvance is included
city is not float
comments is not float
costcentercode is not float
+ deductamount is included
deductdemo is not float
destination is not float
enddate is not float
+ expenditure is included
expensedescription is not float
+ expenseId is included
expensepaymenttype is not float
expensetypecode is not float
expensetypename is not float
flightno is not float
+ ispurchase is included
+ isroundtrip is included
legalentitycode is not float
mileage is not float
organizationcode is not float
+ paidbycompany is included
placeofdeparture is not float
purpose is not float
+ quantity is included
regioncode is not float
+ reimbursementId is included
subject is not float
submitted_date is not float
+ totalbalanceofreceivable is included
+ totalpayableamount is included
+ totalreimbursementamount is included
transport

In [157]:
_, c = sess.run([optimizer, cost], feed_dict={X: input_df_feature})

2.394297e+08