In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler


In [2]:
dataset = spark.table("bank")
cols = dataset.columns

#display(dataset)

In [3]:
categoricalColumns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [4]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="y", outputCol="label")
stages += [label_stringIdx]

In [5]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "balance", "day", "duration", "campaign", "pdays", "previous"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

dataset

In [6]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)


In [7]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")

In [8]:
display(lrModel, preppedDataDF)

In [9]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

In [10]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

In [11]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [12]:
#salva o modelo
lrModel.write().overwrite().save("modelo_lr1")

In [13]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

In [14]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose columns
selected = predictions.select("label", "prediction", "rawPrediction", "probability", "age", "job", "education")
display(selected)

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [16]:
evaluator.getMetricName()

In [17]:
#print(lr.explainParams())

In [18]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [19]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [20]:
#salva o modelo
#cvModel.save("modelo_cv")
cvModel.write().overwrite().save("modelo_cv")

In [21]:
predictions = cvModel.transform(testData)
print(testData)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [22]:
print('Model Intercept: ', cvModel.bestModel.intercept)

In [23]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

In [24]:
from pyspark.mllib.evaluation import MulticlassMetrics

# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction").rdd
metrics = MulticlassMetrics(selected)
print(metrics.confusionMatrix())

# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)


In [25]:
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
from pyspark.ml.linalg import Vectors
from pyspark.mllib.linalg import SparseVector

modelo = LogisticRegressionModel.load("modelo_lr1")
#print(trainingData.first())
     
data1 = [(0, Vectors.sparse(42,[1,11,14,16,17,18,20,21,32,35,36,37,38,39,40],[1,1,1,1,1,1,1,1,1,58,2143,5,261,1,-1]), 24, "blue-collar", "married", "secondary", "no", 1470, "yes", "no", "cellular", 12, "may", 212, 1, -1, 0, "unknown")]
data2 = spark.createDataFrame(data1, ["label", "features"])
 
predictions = modelo.transform(data2)

print(predictions)

selected = predictions.select("label", "prediction", "rawPrediction", "probability")
display(selected)