In [1]:
%pylab inline

Populating the interactive namespace from numpy and matplotlib


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
from time import time

spark = SparkSession.builder.master('local[*]').appName('Multiclass Classification: TBank').config("spark.sql.warehouse.dir", "/home/maffsojah/Projects/HIT_400/capstone_project/web/tbank/spark-warehouse").getOrCreate()

In [41]:
# Load train data : 1 million rows
TrainSet = spark.read.csv('hdfs://localhost:9000/user/hduser/datasets/oneMill.csv', header='true', inferSchema='true')
#TrainSet = spark.read.csv('../datasets/test.csv', header='true', inferSchema='true')
#TrainSet = spark.read.csv('../datasets/predict.csv', header='true', inferSchema='true')

In [42]:
# Creating Spark SQL temporary views with the DataFrames
## Train View
TrainSet.createOrReplaceTempView("customers")
# TrainSet.createOrReplaceTempView("predict")


# SQL can be run over DataFrames that have been registered as a table.
## Train
results = spark.sql("SELECT Gender, Account_Type, Age, Education, Employment, Salary, Employer_Stability, Customer_Loyalty, Balance, Residential_Status, Service_Level FROM customers")

# results = spark.sql("SELECT Gender, Account_Type, Age, Education, Employment, Salary, Employer_Stability, Customer_Loyalty, Balance, Residential_Status, Service_Level FROM predict")


results.show()

+------+---------------+-------+--------------------+----------+------+------------------+----------------+-------+------------------+-------------+
|Gender|   Account_Type|    Age|           Education|Employment|Salary|Employer_Stability|Customer_Loyalty|Balance|Residential_Status|Service_Level|
+------+---------------+-------+--------------------+----------+------+------------------+----------------+-------+------------------+-------------+
|  Male|Current Account|   60 +|Highschool and below|  Contract| 29633|            Stable|               8|    386|             Owned|          1.0|
|  Male|Current Account|   60 +|Highschool and below|   Student|  5622|            Stable|               0|    386|             Owned|          0.0|
|  Male|Savings Account|   60 +|  Tertiary and above|  Contract|  5622|          Unstable|               5| 269684|             Owned|          0.0|
|Female|Current Account|18 - 35|Highschool and below| Permanent|  5622|            Stable|               5

In [43]:
#results columns: trainset
cols = results.columns


## tests columns
#testcols = tests.columns

display(results)

DataFrame[Gender: string, Account_Type: string, Age: string, Education: string, Employment: string, Salary: int, Employer_Stability: string, Customer_Loyalty: int, Balance: int, Residential_Status: string, Service_Level: double]

In [44]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["Gender", "Account_Type", "Age","Education", "Employment", "Employer_Stability", "Residential_Status"]
stages = [] # stages in the pipeline

# "Gender","Account_Type",

for categoricalCol in categoricalColumns:
    
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
    
    # Using OneHotEncoder to convert categorical variables into binary SparseVectors
    #encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(), outputCol=categoricalCol+"classVec")
    
    # Adding the stages: will be run all at once later on
    stages += [stringIndexer, encoder]
    
# convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "Service_Level", outputCol = "label")
stages += [label_stringIdx]

# Transform all features into a vector using VectorAssembler
numericCols = ["Salary", "Customer_Loyalty", "Balance"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

# Creating a Pipeline for Training
pipeline = Pipeline(stages=stages)
# Running the feature transformations.
# - fit() computes feature statistics as needed
# - transform() actually transforms the features
pipelineModel = pipeline.fit(results)
results = pipelineModel.transform(results)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
TrainingData = results.select(selectedcols)
display(TrainingData)
# predictData = results.select(selectedcols)
# display(predictData)

DataFrame[label: double, features: vector, Gender: string, Account_Type: string, Age: string, Education: string, Employment: string, Salary: int, Employer_Stability: string, Customer_Loyalty: int, Balance: int, Residential_Status: string, Service_Level: double]

In [45]:
# Splitting data randomly into training and test sets. set seed for reproducibility
(trainData, testData) = TrainingData.randomSplit([0.7, 0.3], seed = 100)

print trainData.count()
print testData.count()
# print(predictData).count()

699367
300633


In [14]:
from pyspark.ml.classification import LogisticRegression, OneVsRest, LogisticRegressionModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import tempfile
from os.path import join as pjoin

# ## save and load model
# temp_path = tempfile.mkdtemp()
# #globs['temp_path'] = temp_path
# reg_path = temp_path + '/reg'

reg = LogisticRegression(labelCol="label", featuresCol="features", maxIter=1000, regParam=0.01, elasticNetParam=0.8, family="multinomial" )
regModel = reg.fit(trainData)

predict = regModel.transform(testData)
predict.select("prediction", "label", "features").show()
#predict = model2.transform(testData)
# predict = model2.transform(predictData)
# predict.select("prediction", "label", "features").show()

# load saved model
# reg2 = LogisticRegression.load(reg_path)
# regModel2 = LogisticRegressionModel.load(model_path)
# predict = regModel2.transform(predictData)
# predict.select("prediction", "label", "features").show()
        

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predict)

## save and load model
#temp_path = tempfile.mkdtemp()
temp_path = pjoin("/home/maffsojah/Projects/HIT_400/capstone_project/web/tbank/spark-warehouse")
#globs['temp_path'] = temp_path
reg_path = temp_path + '/reg2'
reg.save(reg_path)
#model2 = LogisticRegression.load(reg_path)
#model2.getMaxIter()

model_path = temp_path + '/reg_model2'
regModel.save(model_path)
#model2 = LogisticRegressionModel.load(model_path)

print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = %g Percent" % (accuracy * 100))
print("Coefficients: \n" + str(regModel.coefficientMatrix))
print("Intercept: " + str(regModel.interceptVector))
#print("coefficientMatrix check = %g " % (regModel.coefficientMatrix[0, 1] == model2.coefficientMatrix[0, 1]))
#print("interceptVector check = %g " % (regModel.interceptVector == model2.interceptVector))


+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
|       0.0|  0.0|(12,[0,1,2,4,9,11...|
+----------+-----+--------------------+
only showing top 20 rows

Test Error = 0

In [11]:
print evaluator.explainParams()

labelCol: label column name. (default: label, current: label)
metricName: metric name in evaluation (f1|weightedPrecision|weightedRecall|accuracy) (default: f1, current: accuracy)
predictionCol: prediction column name. (default: prediction, current: prediction)


In [21]:
regModel.transform(testData).show()

+-----+--------------------+------+---------------+-------+--------------------+----------+------+------------------+----------------+-------+------------------+-------------+--------------------+--------------------+----------+
|label|            features|Gender|   Account_Type|    Age|           Education|Employment|Salary|Employer_Stability|Customer_Loyalty|Balance|Residential_Status|Service_Level|       rawPrediction|         probability|prediction|
+-----+--------------------+------+---------------+-------+--------------------+----------+------+------------------+----------------+-------+------------------+-------------+--------------------+--------------------+----------+
|  0.0|(12,[0,1,2,4,9,11...|  Male|Current Account|   60 +|  Tertiary and above| Permanent|   236|            Stable|               0|    386|             Owned|          0.0|[7.31290890911882...|[0.99910414772966...|       0.0|
|  0.0|(12,[0,1,2,4,9,11...|  Male|Current Account|   60 +|  Tertiary and above| Per

In [23]:
predict = model2.transform(predictData)
# predict.select("prediction", "label", "features").show()

In [24]:
from os.path import join as pjoin

## save and load model
#temp_path = tempfile.mkdtemp()
temp_path = pjoin("/home/maffsojah/Projects/HIT_400/capstone_project/web/tbank/spark-warehouse")
#globs['temp_path'] = temp_path
reg_path = temp_path + '/reg'
reg.save(reg_path)
#model2 = LogisticRegression.load(reg_path)
#model2.getMaxIter()

model_path = temp_path + 'reg_model'
regModel.save(model_path)
#model2 = LogisticRegressionModel.load(model_path)

In [25]:
model2 = LogisticRegression.load(reg_path)
model2.getMaxIter()

1000

In [59]:
### MultiLayerPerceptron Model

from pyspark.ml.classification import MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the data into train and test
# splits = results.randomSplit([0.6, 0.4], 1234)
# train = splits[0]
# test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [8, 9, 8, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=1000, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(trainData)


In [60]:
model.layers

[8, 9, 8, 3]

In [61]:
model.weights.size

188

In [62]:
# compute accuracy on the test set
model.transform(testData).show()
predictionAndLabels = mlpred.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
#print("Test set accuracy = " + evaluator.evaluate(predictionAndLabels))

Py4JJavaError: An error occurred while calling o1867.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1953.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1953.0 (TID 7623, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:41)
	at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
	at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:483)
	at org.apache.spark.ml.ann.FeedForwardModel.predict(Layer.scala:530)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predict(MultilayerPerceptronClassifier.scala:317)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predict(MultilayerPerceptronClassifier.scala:291)
	at org.apache.spark.ml.PredictionModel$$anonfun$1.apply(Predictor.scala:193)
	at org.apache.spark.ml.PredictionModel$$anonfun$1.apply(Predictor.scala:192)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
	at sun.reflect.GeneratedMethodAccessor278.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:41)
	at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
	at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:483)
	at org.apache.spark.ml.ann.FeedForwardModel.predict(Layer.scala:530)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predict(MultilayerPerceptronClassifier.scala:317)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predict(MultilayerPerceptronClassifier.scala:291)
	at org.apache.spark.ml.PredictionModel$$anonfun$1.apply(Predictor.scala:193)
	at org.apache.spark.ml.PredictionModel$$anonfun$1.apply(Predictor.scala:192)
	... 16 more


In [66]:
from os.path import join as pjoin
from pyspark.ml.linalg import Vectors

## save and load model
#temp_path = tempfile.mkdtemp()
temp_path = pjoin("/home/maffsojah/Projects/HIT_400/capstone_project/web/tbank/spark-warehouse")

mlp_path = temp_path + "/mlp"
trainer.save(mlp_path)
mlp2 = MultilayerPerceptronClassifier.load(mlp_path)
mlp2.getBlockSize()


model_path = temp_path + "/mlp_model"
model.save(model_path)
model2 = MultilayerPerceptronClassificationModel.load(model_path)
model.layers == model2.layers

model.weights == model2.weights



True