In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.functions import vector_to_array

from pyspark.ml.feature import PCA
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from math import pi

from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt

In [3]:
spark = SparkSession.builder.appName('amex')\
    .config('spark.driver.memory', '192g')\
    .config('spark.executor.memory', '192g')\
    .config('spark.executor.cores', '32')\
    .config('spark.driver.cores', '32')\
    .config('spark.memory.offHeap.size', '192g')\
    .config('spark.executor.memoryOverhead', '10g')\
    .getOrCreate()
    

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/17 15:36:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.csv('train_data.csv', header=True, inferSchema=True)
df = df.orderBy(['customer_ID','S_2'])

                                                                                

In [8]:
df.printSchema()

root
 |-- customer_ID: string (nullable = true)
 |-- S_2: timestamp (nullable = true)
 |-- P_2: double (nullable = true)
 |-- D_39: double (nullable = true)
 |-- B_1: double (nullable = true)
 |-- B_2: double (nullable = true)
 |-- R_1: double (nullable = true)
 |-- S_3: double (nullable = true)
 |-- D_41: double (nullable = true)
 |-- B_3: double (nullable = true)
 |-- D_42: double (nullable = true)
 |-- D_43: double (nullable = true)
 |-- D_44: double (nullable = true)
 |-- B_4: double (nullable = true)
 |-- D_45: double (nullable = true)
 |-- B_5: double (nullable = true)
 |-- R_2: double (nullable = true)
 |-- D_46: double (nullable = true)
 |-- D_47: double (nullable = true)
 |-- D_48: double (nullable = true)
 |-- D_49: double (nullable = true)
 |-- B_6: double (nullable = true)
 |-- B_7: double (nullable = true)
 |-- B_8: double (nullable = true)
 |-- D_50: double (nullable = true)
 |-- D_51: double (nullable = true)
 |-- B_9: double (nullable = true)
 |-- R_3: double (nullable 

In [4]:
all_cols = [c for c in df.columns if c not in ['customer_ID', 'S_2']]
cat_features = ["B_30", "B_38", "D_114", "D_116", "D_117", "D_120", "D_126", "D_63", "D_64", "D_66", "D_68"]
num_features = [col for col in all_cols if col not in cat_features]
num_diff_features = [col + '_diff' for col in num_features]

test_num_group = df.groupBy("customer_ID")

test_num_agg = test_num_group.agg(
    *[mean(col).alias(f"{col}_mean") for col in num_features],
    *[stddev(col).alias(f"{col}_std") for col in num_features],
    *[min(col).alias(f"{col}_min") for col in num_features],
    *[max(col).alias(f"{col}_max") for col in num_features],
    *[last(col).alias(f"{col}_last") for col in num_features]
)

In [None]:
test_num_agg.show(5)

In [None]:
df = df.select("customer_ID", "S_2")

In [None]:
def lag_diff(col_values):
    return [0.0] + [(col_values[i] - col_values[i - 1]) for i in range(1, len(col_values))]

for cc in num_features:
    test_num_agg = test_num_agg.withColumn(f"{cc}_max_min_diff", col(f"{cc}_max") - col(f"{cc}_min"))
    test_num_agg = test_num_agg.withColumn(f"{cc}_last_mean_diff", (col(f"{cc}_last") - col(f"{cc}_mean")).cast("float"))
    test_num_agg = test_num_agg.withColumn(f"{cc}_last_mean_ratio", (col(f"{cc}_last") / col(f"{cc}_mean")).cast("float"))
    test_num_agg = test_num_agg.withColumn(f"{cc}_min_max_ratio", (col(f"{cc}_min") / col(f"{cc}_max")).cast("float"))

    # Extract diff features
    new_col = f"{cc}_diff"
    window_spec = Window.partitionBy("customer_ID").orderBy(monotonically_increasing_id())
    test_num_agg = test_num_agg.withColumn(new_col, lag_diff(col(cc)).cast("float"))

In [4]:


# Dataframe for diff features


test_cat_agg = test_num_group.agg(
    *[count(col).alias(f"{col}_count") for col in cat_features],
    *[last(col).alias(f"{col}_last") for col in cat_features],
    *[countDistinct(col).alias(f"{col}_nunique") for col in cat_features]
)

df = test_num_agg.join(test_cat_agg, "customer_ID", "inner")

test_num_diff_agg = df_diff.groupBy("customer_ID").agg(
    *[F.max(col).alias(f"{col}_max") for col in num_diff_features]
)

df = df.join(test_num_diff_agg, "customer_ID", "inner")


print('shape after engineering', df.count(), len(df.columns))


In [9]:
gbt = GBTClassifier(labelCol='target', featuresCol='pca_features', maxIter=10)
paramGrid = ParamGridBuilder()\
    .addGrid(gbt.maxDepth, [2]).build()
    .addGrid(gbt.maxBins, [20, 60])\
    .addGrid(gbt.maxIter, [10, 20])\
    .addGrid(gbt.stepSize, [0.1, 0.2])\
    
evaluator = BinaryClassificationEvaluator(labelCol='target', metricName='areaUnderROC')
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
cvModel = cv.fit(df)

cvModel.bestModel.extractParamMap()

                                                                                

{Param(parent='GBTClassifier_c085688d38e8', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='GBTClassifier_c085688d38e8', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='GBTClassifier_c085688d38e8', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for r

In [10]:
cvModel.bestModel.getMaxDepth()

2

In [11]:
# save model
cvModel.bestModel.write().overwrite().save('gbt_model')

In [3]:
df,c1,c2 = load_data('train_data.csv','train_labels.csv')

                                                                                

23/07/17 00:19:19 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 147:>                                                     (0 + 40) / 200]

23/07/17 00:21:56 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/07/17 00:21:56 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/07/17 00:22:09 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/07/17 00:22:09 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


                                                                                

In [8]:
test_df,c1,c2 = load_data('test_data.csv')

                                                                                ]

In [5]:
# load model
from pyspark.ml.classification import GBTClassificationModel
gbt_model = GBTClassificationModel.load('gbt_model')

pred = gbt_model.transform(test_df)
pred = pred.select(['customer_ID', 'prediction'])

In [9]:
pred = pred.select('idx', 'target', 'probability', 'prediction')
correct = pred.filter(pred.target == pred.prediction).count()
total = pred.count()
print('Accuracy: ', correct / total)



Accuracy:  0.8474727517246379


                                                                                

In [6]:
# to csv
pred = pred.toPandas()


                                                                                ]

In [8]:
pred['prediction'] = pred['prediction'].astype(int)
pred.to_csv('pred.csv', index=False)