In [1]:
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import RFE
from sklearn.metrics import classification_report, confusion_matrix
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
dataset = spark.table("bank_full_bd3df_csv")
cols = dataset.columns

In [3]:
categoricalColumns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month","poutcome"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [4]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="y", outputCol="label")
stages += [label_stringIdx]

In [5]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "balance", "day", "duration", "campaign", "pdays", "previous"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [6]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

In [7]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.9102080989617556
0.0,0.1111111111111111,0.9102080989617556
0.0119047619047619,0.1111111111111111,0.8727759997784728
0.0119047619047619,0.2222222222222222,0.7336536465107624
0.0238095238095238,0.2222222222222222,0.7259079653018172
0.0357142857142857,0.2222222222222222,0.6134736427382863
0.0357142857142857,0.3333333333333333,0.5949502237042397
0.0476190476190476,0.3333333333333333,0.5214377378376418
0.0595238095238095,0.3333333333333333,0.4940334048594755
0.0714285714285714,0.3333333333333333,0.3992205115280179


In [8]:
display(lrModel, preppedDataDF)

fitted values,residuals
-4.463155554981384,-0.011394601680481
-2.5914586517811085,-0.0696901540898525
-4.970051147367216,-0.0068949228982034
-4.894938299514752,-0.0074287713183618
-4.464246210287158,-0.0113823222499832
-5.007168775932872,-0.006645360644032
-4.730144389014285,-0.0087479939193287
-2.22271352705246,-0.0977292688833315
-4.38058025598015,-0.0123633275334815
-3.3834761128355844,-0.0328158870851597


In [9]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

label,features,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0.0,"List(0, 42, List(1, 11, 14, 16, 17, 18, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 58.0, 2143.0, 5.0, 261.0, 1.0, -1.0))",58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
0.0,"List(0, 42, List(2, 12, 13, 16, 17, 18, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 44.0, 29.0, 5.0, 151.0, 1.0, -1.0))",44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
0.0,"List(0, 42, List(7, 11, 13, 16, 17, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 33.0, 2.0, 5.0, 76.0, 1.0, -1.0))",33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
0.0,"List(0, 42, List(0, 11, 16, 17, 18, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 47.0, 1506.0, 5.0, 92.0, 1.0, -1.0))",47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
0.0,"List(0, 42, List(12, 16, 18, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 33.0, 1.0, 5.0, 198.0, 1.0, -1.0))",33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no
0.0,"List(0, 42, List(1, 11, 14, 16, 17, 18, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 35.0, 231.0, 5.0, 139.0, 1.0, -1.0))",35,management,married,tertiary,no,231,yes,no,unknown,5,may,139,1,-1,0,unknown,no
0.0,"List(0, 42, List(1, 12, 14, 16, 17, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 28.0, 447.0, 5.0, 217.0, 1.0, -1.0))",28,management,single,tertiary,no,447,yes,yes,unknown,5,may,217,1,-1,0,unknown,no
0.0,"List(0, 42, List(7, 14, 17, 18, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 42.0, 2.0, 5.0, 380.0, 1.0, -1.0))",42,entrepreneur,divorced,tertiary,yes,2,yes,no,unknown,5,may,380,1,-1,0,unknown,no
0.0,"List(0, 42, List(5, 11, 15, 16, 17, 18, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 58.0, 121.0, 5.0, 50.0, 1.0, -1.0))",58,retired,married,primary,no,121,yes,no,unknown,5,may,50,1,-1,0,unknown,no
0.0,"List(0, 42, List(2, 12, 13, 16, 17, 18, 20, 21, 32, 35, 36, 37, 38, 39, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 43.0, 593.0, 5.0, 55.0, 1.0, -1.0))",43,technician,single,secondary,no,593,yes,no,unknown,5,may,55,1,-1,0,unknown,no


In [10]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

In [11]:
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [12]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [13]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "job", "education", "age")
display(selected)

label,job,education,age
0.0,blue-collar,secondary,27
0.0,blue-collar,secondary,28
0.0,blue-collar,secondary,28
0.0,blue-collar,secondary,28
0.0,blue-collar,secondary,29
0.0,blue-collar,secondary,29
0.0,blue-collar,secondary,29
0.0,blue-collar,secondary,29
0.0,blue-collar,secondary,30
0.0,blue-collar,secondary,30


In [14]:
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [15]:
evaluator.getMetricName()

In [16]:
print(lr.explainParams())

In [17]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [18]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [19]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [20]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [21]:
print('Model Intercept: ', cvModel.bestModel.intercept)

In [22]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

Feature Weight
-0.2222247276205723
-0.0416248988050477
-0.0024997744071399
0.2014744286358824
-0.0934098791705363
0.4235262987344917
-0.1815665968419371
-0.1780356947015741
0.0454152487350491
-0.3608122814540584


In [23]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "job", "education", "age")
display(selected)

label,job,education,age
0.0,blue-collar,secondary,27
0.0,blue-collar,secondary,28
0.0,blue-collar,secondary,28
0.0,blue-collar,secondary,28
0.0,blue-collar,secondary,29
0.0,blue-collar,secondary,29
0.0,blue-collar,secondary,29
0.0,blue-collar,secondary,29
0.0,blue-collar,secondary,30
0.0,blue-collar,secondary,30


In [24]:
trainingSummary = lrModel.summary
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [25]:
from pyspark.sql.types import IntegerType

# notice the variable name (more below)
mylist = [0,42,[2,11,13,16,18,20,21,32,35,36,37,38,39,40],[1,1,1,1,1,1,1,1,53,-3,5,1666,1,-1]]


cvModel.transform(mylist)