In [1]:
''' 
Basic  system Set up
'''
import time
from time import gmtime, strftime 
timetmp = strftime("%Y-%m-%d %H:%M:%S", gmtime())
appname = 'Pyspark_' + timetmp

import os
## update the pyspark config
os.environ["LD_LIBRARY_PATH"]="/opt/rh/python27/root/usr/lib64"
os.environ["PYSPARK_PYTHON"]="/dsap/devl/private/common/python2.7/bin/python"
#”/usr/hdp/current/spark2-client” is spark2
os.environ["SPARK_HOME"]="/usr/hdp/current/spark2-client" 
os.environ["PYSPARK_DRIVER_PYTHON"]="/dsap/devl/private/common/python2.7/bin/jupyter"
os.environ["PYTHONPATH"]="/usr/hdp/current/spark2-client/python:/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip"


from pyspark import SparkContext, SparkConf
conf=(SparkConf().setMaster("yarn-client").setAppName(appname)
      .set("spark.yarn.queue", "BIA-support")
      .set("spark.num.executors","20")
      .set("spark.driver.cores","8") # not sure if this can help improve speed
      .set("spark.executor.memory", "20g")
     #.set("spark.executor.instances","20") # not sure if this can help improve speed
     #.set("spark.shuffle.service.enabled","true") # not sure if this can help improve speed
     #.set("spark.dynamicAllocation.enabled","true") # not sure if this can help improve speed
     #.set("spark.dynamicAllocation.minExecutors","5") # not sure if this can help improve speed
     )

sc=SparkContext(conf=conf)
from pyspark.sql import HiveContext
hiveContext=HiveContext(sc) 

print"done"

done


** import data with missing values filled with 0s **

In [2]:
# only use the top 100 merchants
get_query = 'select acct_key, mrch_brnd_id, tran_cnt from s_card_marketinganalytics.wfeng_top100_mrch_full'
dataframe = hiveContext.sql(get_query)

In [3]:
train, test = dataframe.randomSplit([0.8,0.2],seed=0L)

In [4]:
train.cache()
test.cache()

DataFrame[acct_key: int, mrch_brnd_id: int, tran_cnt: int]

In [5]:
# delete the dataframe to release some memory
import gc
del dataframe
gc.collect()

334

** Hu's Method **
- the simplist model prduces auc of 0.53 with training time 1.5 hours
- need to tune parameters to improve performance
- need to think about how to address this imbalanced classification problem (majority are 0s, because the objective function is to minimize total squared errors, it tends to predict more 0s to minimize the objective function)

In [6]:
from pyspark.ml.recommendation import ALS
import math

als = ALS(rank=10, maxIter=10, regParam=1.0, alpha=1.0, numUserBlocks=10, numItemBlocks=10, 
          implicitPrefs=True, nonnegative=False, 
          userCol="acct_key", itemCol="mrch_brnd_id", ratingCol="tran_cnt", 
          #checkpointInterval=10, 
          seed=1L, 
          intermediateStorageLevel="DISK_ONLY_2", finalStorageLevel="MEMORY_AND_DISK")

# %time model = als.fit(train) # this doesn't save the model to memory when intermediateStorageLevel="DISK_ONLY_2"
# only works when intermediateStorageLevel="MEMORY_AND_DISK"

In [7]:
strt_time = time.time()

model = als.fit(train)

end_time = time.time()
print"{0:.0f} seconds" .format(end_time - strt_time)

4727 seconds


In [8]:
pred = model.transform(test).cache()
# because we have filled all missing entries with 0s, there will be no NaN values in the predicted results

In [30]:
pred.show(10)

+--------+------------+--------+-------------+
|acct_key|mrch_brnd_id|tran_cnt|   prediction|
+--------+------------+--------+-------------+
|    7417|         137|       0|   0.15832238|
|   12006|         137|       0|   0.08355707|
|  115363|         137|       0|   0.10356664|
|  116319|         137|       0|-0.0010002964|
|  203794|         137|       0|  0.040202715|
|  219972|         137|       0|   0.06458692|
|  225757|         137|       0| 0.0041738767|
|  225866|         137|       2|          0.0|
|  271143|         137|       0|  0.042408902|
|  274111|         137|       0|   0.02704041|
+--------+------------+--------+-------------+
only showing top 10 rows



In [29]:
strt_time = time.time()

# the predicted values are not the probability of a transaction, therefore, the range are not [0.1], but could be > 1 or < 0
max_score = pred.select("prediction").rdd.max()[0]
min_score = pred.select("prediction").rdd.min()[0]

print "max score: %.2f" % max_score
print "min score: %.2f" % min_score

end_time = time.time()
print"{0:.0f} seconds" .format(end_time - strt_time)

max score: 2.11
min score: -1.14
1652 seconds


In [57]:
# because Hu's method output scores are not probability of tran_cnt = 1, we need to map it to range [0,1]
# and then apply cutoff threshold to convert it to binary outcome

cutoff_threshold = 0.5 # this is the cutoff threshold we want to use for the probability in the range of [0,1]

# this is the corresponding cutoff threshold for the scores
adj_threshold = cutoff_threshold * (max_score - min_score) + min_score 

from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType, IntegerType, DoubleType

udf_score = udf(lambda x: 1.00 if x >= adj_threshold else 0.00, DoubleType()) #Define UDF function

%time pred = pred.withColumn('pred_binary',udf_score('prediction'))

CPU times: user 3 ms, sys: 2 ms, total: 5 ms
Wall time: 14.7 ms


In [58]:
pred.show(10)

+--------+------------+--------+-------------+-----------+----------+
|acct_key|mrch_brnd_id|tran_cnt|   prediction|pred_binary|obs_binary|
+--------+------------+--------+-------------+-----------+----------+
|    7417|         137|       0|   0.15832238|        0.0|      null|
|   12006|         137|       0|   0.08355707|        0.0|      null|
|  115363|         137|       0|   0.10356664|        0.0|      null|
|  116319|         137|       0|-0.0010002964|        0.0|      null|
|  203794|         137|       0|  0.040202715|        0.0|      null|
|  219972|         137|       0|   0.06458692|        0.0|      null|
|  225757|         137|       0| 0.0041738767|        0.0|      null|
|  225866|         137|       2|          0.0|        0.0|      null|
|  271143|         137|       0|  0.042408902|        0.0|      null|
|  274111|         137|       0|   0.02704041|        0.0|      null|
+--------+------------+--------+-------------+-----------+----------+
only showing top 10 

In [59]:
# convert raw tran_cnt into binary data

udf_tran_cnt = udf(lambda x: 1.00 if x > 0 else 0.00, DoubleType()) #Define UDF function

%time pred = pred.withColumn('obs_binary',udf_tran_cnt('tran_cnt'))

CPU times: user 1 ms, sys: 0 ns, total: 1 ms
Wall time: 8.77 ms


In [60]:
pred.show(10)

+--------+------------+--------+-------------+-----------+----------+
|acct_key|mrch_brnd_id|tran_cnt|   prediction|pred_binary|obs_binary|
+--------+------------+--------+-------------+-----------+----------+
|    7417|         137|       0|   0.15832238|        0.0|       0.0|
|   12006|         137|       0|   0.08355707|        0.0|       0.0|
|  115363|         137|       0|   0.10356664|        0.0|       0.0|
|  116319|         137|       0|-0.0010002964|        0.0|       0.0|
|  203794|         137|       0|  0.040202715|        0.0|       0.0|
|  219972|         137|       0|   0.06458692|        0.0|       0.0|
|  225757|         137|       0| 0.0041738767|        0.0|       0.0|
|  225866|         137|       2|          0.0|        0.0|       1.0|
|  271143|         137|       0|  0.042408902|        0.0|       0.0|
|  274111|         137|       0|   0.02704041|        0.0|       0.0|
+--------+------------+--------+-------------+-----------+----------+
only showing top 10 

In [61]:
pred.printSchema()

root
 |-- acct_key: integer (nullable = true)
 |-- mrch_brnd_id: integer (nullable = true)
 |-- tran_cnt: integer (nullable = true)
 |-- prediction: float (nullable = true)
 |-- pred_binary: double (nullable = true)
 |-- obs_binary: double (nullable = true)



In [62]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="pred_binary", labelCol="obs_binary",
                                          metricName="areaUnderROC")
auc = evaluator.evaluate(pred)
print "Are under ROC: %.2f" % auc

Are under ROC: 0.53


-- this very low ROC may be due to the imbalanced classification problem, where majority of the observations are 0s, very few are 1s, so the optimization tends to predict every outcome to be 0

In [91]:
%time tot_obs = pred.select("obs_binary").groupBy().sum().collect()[0][0]
print "total positive observations: %d" % tot_obs

total positive observations: 17226369


In [90]:
%time tot_pred = pred.count()
print "total observations: %d" % tot_pred
print "%% of positive observations: %d%%" % (tot_obs / tot_pred * 100)

total observations: 301221717
% of positive observations: 5%


In [93]:
%time pos_pred = pred.select("pred_binary").groupBy().sum().collect()[0][0]
print "total positive predictions: %d" % pos_pred
print "%% of positive observations: %.2f%%" % (pos_pred / tot_pred * 100)

total positive predictions: 1865148
% of positive observations: 0.62%


** Non-Negative Matrix Factorization **
- I have tested rank = 10 (1.5 hr) and rank = 30 (3.5 hr), neither predicted any positive numbers (all 0s)
- it seems that this method itself is not a good choice, may be helpful when turn on this in Hu's method, which will potentially slows down Hu's method

In [98]:
from pyspark.ml.recommendation import ALS
import math

als_nmf = ALS(rank=10, maxIter=10, regParam=1.0, alpha=1.0, numUserBlocks=10, numItemBlocks=10, 
          implicitPrefs=False, nonnegative=True, 
          userCol="acct_key", itemCol="mrch_brnd_id", ratingCol="tran_cnt", 
          checkpointInterval=10, seed=1L, 
          intermediateStorageLevel="DISK_ONLY_2", finalStorageLevel="MEMORY_AND_DISK")

# %time model = als.fit(train) # this doesn't save the model to memory when intermediateStorageLevel="DISK_ONLY_2"
# only works when intermediateStorageLevel="MEMORY_AND_DISK"

In [99]:
strt_time = time.time()

model_nmf = als_nmf.fit(train)

end_time = time.time()
print"{0:.0f} seconds" .format(end_time - strt_time)

4543 seconds


In [101]:
pred_nmf = model_nmf.transform(test).cache()

In [117]:
%time max_pred_nmf = pred_nmf.select("prediction").rdd.max()[0]
print max_pred_nmf

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job 80 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:806)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1668)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1587)
	at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1833)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1832)
	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:108)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


-- unfortunately, it did not predict any positive tran_cnt

In [104]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_nmf = RegressionEvaluator(metricName="rmse", labelCol="tran_cnt",
                                    predictionCol="prediction")
% time rmse = evaluator_nmf.evaluate(pred_nmf)
print "Root mean squared error: %.2f " % rmse

Root mean squared error: 1.52 


-- now let's test rank = 30

In [105]:
als_nmf2 = ALS(rank=30, maxIter=10, regParam=1.0, alpha=1.0, numUserBlocks=10, numItemBlocks=10, 
               implicitPrefs=False, nonnegative=True, 
               userCol="acct_key", itemCol="mrch_brnd_id", ratingCol="tran_cnt", 
               checkpointInterval=10, seed=1L, 
               intermediateStorageLevel="MEMORY_AND_DISK", finalStorageLevel="MEMORY_AND_DISK")

%time model_nmf2 = als_nmf2.fit(train)

CPU times: user 707 ms, sys: 408 ms, total: 1.11 s
Wall time: 3h 31min 2s


In [None]:
%time pred_nmf2 = model_nmf2.transform(test).cache()

In [111]:
%time max_pred_nmf2 = pred_nmf2.select("prediction").rdd.max()[0]
print max_pred_nmf2

CPU times: user 69 ms, sys: 27 ms, total: 96 ms
Wall time: 11min 12s
0.0


-- again, no positive predictions

** Hu's method parameter tuning **

In [None]:
# convert raw tran_cnt into binary data
%time train = train.withColumn('tran_cnt_binary',udf_tran_cnt('tran_cnt'))

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

als_hu = ALS()

# parameter setting for initial tuning
paramgrid = ParamGridBuilder() \
    .baseOn([als.numUserBlocks,-1]) \
    .baseOn([als.numItemBlocks,-1]) \
    .baseOn([als.userCol,"acct_key"]) \
    .baseOn([als.itemCol,"mrch_brnd_id"]) \
    .baseOn([als.ratingCol,"tran_cnt"])\
    .baseOn([als.implicitPrefs, True]) \
    .baseOn([als.nonnegative,True]) \
    .baseOn([als.seed,1L]) \
    .baseOn([als.intermediateStorageLevel, "MEMORY_AND_DISK"]) \
    .baseOn([als.finalStorageLevel,"MEMORY_AND_DISK"]) \
    .addGrid(als.rank, [30,50]) \
    .addGrid(als.regParam, [1.0,10.0]) \
    .addGrid(als.maxIter, 30) \
    .addGrid(als.alpha, [0.1, 0.5]) \
    .build()

eval_hu = BinaryClassificationEvaluator(rawPredictionCol="pred_binary", labelCol="tran_cnt_binary",
                                        metricName="areaUnderROC")
# how to create "pred_binary" column here? We did not specify the predicton dataframe.

# using RegressionEvaluator RMSE between tran_cnt and score seems not a reasonable choice

grid_hu = TrainValidationSplit(estimator=als_hu, estimatorParamMaps=paramgrid, evaluator=eval_hu, trainRatio=0.8)
model_hu = grid_hu.fit(train) # we should use training data here instead of dataframe, otherwise we are vulnerable to overfitting


In [None]:
# extract the best model from model_hu
# test performance on test data