# Binary Classification Example

The Pipelines API provides higher-level API built on top of DataFrames for constructing ML pipelines.
You can read more about the Pipelines API in the [programming guide](https://spark.apache.org/docs/latest/ml-guide.html).

**Binary Classification** is the task of predicting a binary label.
E.g., is an email spam or not spam? Should I show this ad to this user or not? Will it rain tomorrow or not?
This section demonstrates algorithms for making these types of predictions.

## Dataset Review

Vamos a usar el dataset de Adult, el cual viene pre-cargado en Databricks. [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult).
Consta de datos censales, la idea es predecir un el income de la persona esta por arriba o por debajo de 50K USD anual (ojala estuviesemos todos aca...)

La complejidad con la que nos encontramos es que tenemos variables continuas, discretas ordinales y nominales.

Attribute Information:

- age: continuous
- workclass: Private,Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
- fnlwgt: continuous
- education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc...
- education-num: continuous
- marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent...
- occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners...
- relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
- race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
- sex: Female, Male
- capital-gain: continuous
- capital-loss: continuous
- hours-per-week: continuous
- native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany...

Target/Label: - <=50K, >50K

## Load Data

In [0]:
%fs ls databricks-datasets/adult/adult.data

path,name,size
dbfs:/databricks-datasets/adult/adult.data,adult.data,3974305


In [0]:
%fs head databricks-datasets/adult/adult.data

In [0]:
from pyspark.sql.types import DoubleType, StringType, StructField, StructType

schema = StructType([
  StructField("age", DoubleType(), False),
  StructField("workclass", StringType(), False),
  StructField("fnlwgt", DoubleType(), False),
  StructField("education", StringType(), False),
  StructField("education_num", DoubleType(), False),
  StructField("marital_status", StringType(), False),
  StructField("occupation", StringType(), False),
  StructField("relationship", StringType(), False),
  StructField("race", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("capital_gain", DoubleType(), False),
  StructField("capital_loss", DoubleType(), False),
  StructField("hours_per_week", DoubleType(), False),
  StructField("native_country", StringType(), False),
  StructField("income", StringType(), False)
])

dataset = spark.read.format("csv").schema(schema).load("/databricks-datasets/adult/adult.data")
cols = dataset.columns

In [0]:
display(dataset)

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
39.0,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
37.0,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
52.0,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
31.0,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
42.0,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K


## Preprocess Data

Necesitamos preprocesar los datos para agregarlos al modelo. Para ello vamos a usar pipelines de MLlib. 

* String Indexer

  Necesitamos pasar de string -> indice. eg: "hola":0,"asd":1,"pp":2

* One-Hot Encoding

  Convierte las categorias en vector binario (eg: (0: [0, 0]), (1: [0, 1]), (2: [1, 0]))

Para lograr manejar los features categoricos vamos a usar una combinacion de [StringIndexer] y [OneHotEncoderEstimator] 

`OneHotEncoderEstimator` nos va a devolver un [SparseVector]. Note: [OneHotEncoderEstimator] is [renamed as OneHotEncoder] in Spark 3.0.

Como tenemos mas de una categoria, vamos a usar [Pipeline] y transformar la informacion

[StringIndexer]: http://spark.apache.org/docs/latest/ml-features.html#stringindexer
[OneHotEncoderEstimator]: https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator
[SparseVector]: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.linalg.SparseVector
[Pipeline]: http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline
[renamed as OneHotEncoder]: https://issues.apache.org/jira/browse/SPARK-26133

In [0]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder,VectorAssembler

In [0]:
#Probemos con una sola categoria: workclass
# Recordemos que los pipelines tenemos que pensarlo como una cadena de cosas que van pasando una a continuacion de la otra.
# StringIndexer -> Encoder -> Output

stringIndexer = StringIndexer(inputCol="workclass", outputCol= "workclassIndex")
encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=["workclassclassVec"])

indexer = stringIndexer.fit(dataset[["workclass"]]).transform(dataset[["workclass"]])
display(encoder.fit(indexer).transform(indexer))

workclass,workclassIndex,workclassclassVec
State-gov,4.0,"List(0, 8, List(4), List(1.0))"
Self-emp-not-inc,1.0,"List(0, 8, List(1), List(1.0))"
Private,0.0,"List(0, 8, List(0), List(1.0))"
Private,0.0,"List(0, 8, List(0), List(1.0))"
Private,0.0,"List(0, 8, List(0), List(1.0))"
Private,0.0,"List(0, 8, List(0), List(1.0))"
Private,0.0,"List(0, 8, List(0), List(1.0))"
Self-emp-not-inc,1.0,"List(0, 8, List(1), List(1.0))"
Private,0.0,"List(0, 8, List(0), List(1.0))"
Private,0.0,"List(0, 8, List(0), List(1.0))"


In [0]:
# Ahora que entendimos como funciona con una sola clase, vamos a hacer para todos los features categoricos:
# StringIndexer[Feature1] -> Encoder[Feature1] -> StringIndexer[Feature2] -> Encoder[Feature2] -> StringIndexer[Feature3] -> Encoder[Feature3]...

categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]

stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors

    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]
    
# Vamos a guardar en esta lista todos los steps
print(stages)

Ya encodeamos los features categoricos, ahora vamos a pasar a 0-1 el target. No vamos a usar One hot encoding ya que solo tenemos dos categorias

In [0]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

Vamos a usar `VectorAssembler` para combinar todos los features(continuos + categoricos). Pero recordemos que para pasarlo a onehotencoding, tuvimos que antes usar stringindexer que no lo necesitamos para entrenar nuestro modelo. Para poder ir a buscarlo, le pusimos el nombre "classVec" al final

In [0]:
# Transform all features into a vector using VectorAssembler
cateFreaturesCols = [c + "classVec" for c in categoricalColumns]
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]

print("Features Categoricos:",cateFreaturesCols)
print("\nFeatures Continuos:",numericCols)

assemblerInputs = cateFreaturesCols + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

print("\n")
print(stages)

Corremos el pipeline que creamos antes (fit + transform) y seleccionamos las variables que nos interesan

In [0]:
# Aplicamos el pipeline de transformacion de datos
  
partialPipeline = Pipeline().setStages(stages) # Ingreso en el pipeline las etapas que queremos aplicar
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

# Seleccionamos las variables que nos interesan
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols) # seleccionamos
display(dataset)

label,features,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
0.0,"List(0, 100, List(4, 10, 24, 32, 44, 48, 52, 53, 94, 95, 96, 97, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 39.0, 77516.0, 13.0, 2174.0, 40.0))",39.0,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
0.0,"List(0, 100, List(1, 10, 23, 31, 43, 48, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 50.0, 83311.0, 13.0, 13.0))",50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
0.0,"List(0, 100, List(0, 8, 25, 38, 44, 48, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 38.0, 215646.0, 9.0, 40.0))",38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
0.0,"List(0, 100, List(0, 13, 23, 38, 43, 49, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 53.0, 234721.0, 7.0, 40.0))",53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
0.0,"List(0, 100, List(0, 10, 23, 29, 47, 49, 62, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 28.0, 338409.0, 13.0, 40.0))",28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
0.0,"List(0, 100, List(0, 11, 23, 31, 47, 48, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.0, 284582.0, 14.0, 40.0))",37.0,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
0.0,"List(0, 100, List(0, 18, 28, 34, 44, 49, 64, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 49.0, 160187.0, 5.0, 16.0))",49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
1.0,"List(0, 100, List(1, 8, 23, 31, 43, 48, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 52.0, 209642.0, 9.0, 45.0))",52.0,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
1.0,"List(0, 100, List(0, 11, 24, 29, 44, 48, 53, 94, 95, 96, 97, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 31.0, 45781.0, 14.0, 14084.0, 50.0))",31.0,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
1.0,"List(0, 100, List(0, 10, 23, 31, 43, 48, 52, 53, 94, 95, 96, 97, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 42.0, 159449.0, 13.0, 5178.0, 40.0))",42.0,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K


In [0]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

## Fit and Evaluate Models

Ya estamos listos para usar los modelos de clasificacion binaria que nos da la API de Pipelines.

Los algoritmos que vamos a usar son:
- 1.Logistic Regression
- 2.Decision Tree Classifier
- 3.Random Forest Classifier

Los pasos que haremos son:
- Crear modelo base
- Tunearlo con `ParamGrid` y 5-fold Cross Validation
- Evaluar los hiperparametros ganadores con el set de testing

Usaremos `BinaryClassificationEvaluator` para evaluar los modelos usando [areaUnderROC] como metrica default.

[areaUnderROC]: https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve

## 1. Logistic Regression

Para mas info de [Logistic Regression] leer [classification and regression] de MLlib Programming Guide.

Vamos a fitear una logisitica base y luego vamos a agregarle condimento y hacerlo con regularizacion, para ellos tendremos que encontrar hiperparametros

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Logistic Regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression

In [0]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [0]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [0]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,1.0,"List(1, 2, List(), List(0.16304404160704025, 0.8369559583929598))",36.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.701186532553927, 0.29881346744607296))",32.0,Prof-specialty
0.0,1.0,"List(1, 2, List(), List(0.49801131876699517, 0.5019886812330049))",33.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6812616518641682, 0.3187383481358317))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6108620507116007, 0.3891379492883993))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6096341652357772, 0.39036583476422276))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.600259838311847, 0.399740161688153))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5998980983235306, 0.40010190167646953))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7478554516615129, 0.25214454833848704))",34.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.9885496846453006, 0.011450315354699533))",20.0,Prof-specialty


Vamos a usar ``BinaryClassificationEvaluator`` para evalular el modelo. Por defecto ``BinaryClassificationEvaluator`` usa ``areaUnderROC``

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
print("Performance:",evaluator.evaluate(predictions))
evaluator.getMetricName() # chequeamos que realmente sea curva ROC

The evaluator currently accepts 2 kinds of metrics - areaUnderROC and areaUnderPR.
We can set it to areaUnderPR by using evaluator.setMetricName("areaUnderPR").

Now we will try tuning the model with the ``ParamGridBuilder`` and the ``CrossValidator``.

If you are unsure what params are available for tuning, you can use ``explainParams()`` to print a list of all params and their definitions.

In [0]:
print(lr.explainParams())

Vamos a probar con 3 valores para regParam, 3 valores para maxIter, y 2 valores para elasticNetParam.

Esto nos da una grilla de 3 x 3 x 3 = 27 parametros para la cross-validacion

Vamos a usar 5-fold cross validators.

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation. Seteamos alpha y gamma de ridge-lasso y las iteraciones para que generalice mejor
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [0]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [0]:
# cvModel se termina quedando con el mejor modelo
# Evaluate best model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

We can also access the model's feature weights and intercepts easily

In [0]:
print('Model Intercept: ', cvModel.bestModel.intercept)

In [0]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

Feature Weight
-0.2816032757048572
-0.6264483359098225
-0.4360275569861357
-0.506424771170999
-0.5063266891180264
-0.0049481441717131
0.0708698962394766
-2.669789381029706
-0.559356701414903
-0.2239437895813227


In [0]:
# View best model's predictions and probabilities of each prediction class
from pyspark.ml.functions import vector_to_array

selected = predictions.select("label", "prediction", "probability", "age", "occupation") # filtro columnas

selected = selected.withColumn("xs", vector_to_array("probability"))  # des-anido la columna array
display(selected.withColumn("proba1", selected["xs"].getItem(1)).drop("xs")) # me quedo con la proba de ser 1

# display(selected)

label,prediction,probability,age,occupation,proba1
0.0,1.0,"List(1, 2, List(), List(0.23296419268391308, 0.767035807316087))",36.0,Prof-specialty,0.767035807316087
0.0,0.0,"List(1, 2, List(), List(0.6552066745246352, 0.34479332547536473))",32.0,Prof-specialty,0.3447933254753647
0.0,0.0,"List(1, 2, List(), List(0.5391022452506138, 0.46089775474938627))",33.0,Prof-specialty,0.4608977547493862
0.0,0.0,"List(1, 2, List(), List(0.637341694464255, 0.36265830553574496))",39.0,Prof-specialty,0.3626583055357449
0.0,0.0,"List(1, 2, List(), List(0.6016034358099872, 0.39839656419001285))",39.0,Prof-specialty,0.3983965641900128
0.0,0.0,"List(1, 2, List(), List(0.5947767904063226, 0.4052232095936774))",50.0,Prof-specialty,0.4052232095936774
0.0,0.0,"List(1, 2, List(), List(0.5899376739565216, 0.4100623260434785))",51.0,Prof-specialty,0.4100623260434785
0.0,0.0,"List(1, 2, List(), List(0.5976373681713878, 0.4023626318286122))",60.0,Prof-specialty,0.4023626318286122
0.0,0.0,"List(1, 2, List(), List(0.6907455609652339, 0.3092544390347661))",34.0,Prof-specialty,0.3092544390347661
0.0,0.0,"List(1, 2, List(), List(0.9578400279746343, 0.04215997202536573))",20.0,Prof-specialty,0.0421599720253657


In [0]:
selected.printSchema() # miro los tipos de datos

## Decision Trees

[Decision Trees](http://spark.apache.org/docs/latest/mllib-decision-tree.html) in the Spark MLLib Programming Guide.

Vamos a crear un modelo base. Luego vamos a buscar hyperparametros

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

Validamos los hyperparametros

In [0]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

In [0]:
display(dtModel)

treeNode
"{""index"":5,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0],""feature"":23,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":7792.0,""categories"":null,""feature"":97,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":20.5,""categories"":null,""feature"":94,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":9,""featureType"":""continuous"",""prediction"":null,""threshold"":12.5,""categories"":null,""feature"":96,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":3368.0,""categories"":null,""feature"":97,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":8,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [0]:
# Vemos prediccion de modelo base
predictions = dtModel.transform(testData)

In [0]:
predictions.printSchema()

In [0]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",36.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",32.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",33.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",34.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7010492093985518, 0.2989507906014482))",20.0,Prof-specialty


Evaluamos el modelo con la clase `BinaryClassificationEvaluator` que usamos con LR tambien

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Ahora usamos ``ParamGridBuilder`` y ``CrossValidator``.

In [0]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

In [0]:
print("numNodes = ", cvModel.bestModel.numNodes)
print("depth = ", cvModel.bestModel.depth)

In [0]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [0]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [0]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"List(1, 2, List(), List(0.5089285714285714, 0.49107142857142855))",36.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.8206106870229007, 0.17938931297709923))",32.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.8206106870229007, 0.17938931297709923))",33.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6779661016949152, 0.3220338983050847))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6779661016949152, 0.3220338983050847))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6779661016949152, 0.3220338983050847))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6779661016949152, 0.3220338983050847))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.9274193548387096, 0.07258064516129033))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6779661016949152, 0.3220338983050847))",34.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.9145077720207254, 0.08549222797927461))",20.0,Prof-specialty


## Random Forest

Su turno! Vamos a fitear un modelo base como antes y luego entrenar el modelo con los hiperparametros que les dejo.

You can read more about [Random Forest] from the [classification and regression] section of MLlib Programming Guide.

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Random Forest]: https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forests

In [0]:
from pyspark.ml.classification import RandomForestClassifier

# Creamos modelo base
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Entrenamos el modelo
rfModel = rf.fit(trainingData)

In [0]:
# Hacemos predicciones
predictions = rfModel.transform(testData)

In [0]:
predictions.printSchema()

In [0]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

Evaluamos con `BinaryClassificationEvaluator`.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Ahora con CV

In [0]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [0]:
predictions = cvModel.transform(testData)

In [0]:
# Miro curva ROC
evaluator.evaluate(predictions)

In [0]:
# Predicciones ganadoras
selected = predictions.select("label", "prediction", "probability", "age", "occupation")

selected = selected.withColumn("xs", vector_to_array("probability"))  # des-anido la columna array
selected = selected.withColumn("positive_proba", selected["xs"].getItem(1)).drop("xs") # me quedo con la proba de ser 1

display(selected)

label,prediction,probability,age,occupation,positive_proba
0.0,0.0,"List(1, 2, List(), List(0.5094807638277546, 0.49051923617224535))",36.0,Prof-specialty,0.4905192361722453
0.0,0.0,"List(1, 2, List(), List(0.661701148411863, 0.338298851588137))",32.0,Prof-specialty,0.338298851588137
0.0,0.0,"List(1, 2, List(), List(0.6475646101435547, 0.3524353898564453))",33.0,Prof-specialty,0.3524353898564453
0.0,0.0,"List(1, 2, List(), List(0.661701148411863, 0.338298851588137))",39.0,Prof-specialty,0.338298851588137
0.0,0.0,"List(1, 2, List(), List(0.6409016328210144, 0.3590983671789857))",39.0,Prof-specialty,0.3590983671789857
0.0,0.0,"List(1, 2, List(), List(0.661701148411863, 0.338298851588137))",50.0,Prof-specialty,0.338298851588137
0.0,0.0,"List(1, 2, List(), List(0.661701148411863, 0.338298851588137))",51.0,Prof-specialty,0.338298851588137
0.0,0.0,"List(1, 2, List(), List(0.6847313095078987, 0.31526869049210127))",60.0,Prof-specialty,0.3152686904921012
0.0,0.0,"List(1, 2, List(), List(0.6680692793701579, 0.331930720629842))",34.0,Prof-specialty,0.331930720629842
0.0,0.0,"List(1, 2, List(), List(0.8630135705355219, 0.1369864294644781))",20.0,Prof-specialty,0.1369864294644781


Vamos a subir a una tabla temporal los datos asi la podemos queriar

In [0]:
selected.select("label", "prediction", "positive_proba", "age", "occupation").createOrReplaceTempView("finalPredictions")

Entendamos un poco como esta pensando el modelo. Veamos que pasa si cruzamos la ocupacion vs edad abierto por probabilidad de clase positiva.

Luego hagamos un analisis de errores, donde tenemos mas proporcion de aciertos?

In [0]:
%sql
SELECT age, occupation,AVG(positive_proba) AS mean_proba
FROM finalPredictions
GROUP BY age,occupation



age,occupation,mean_proba
51.0,Prof-specialty,0.4573448917956919
61.0,Adm-clerical,0.1789921031264386
47.0,Sales,0.3349311459845855
63.0,Machine-op-inspct,0.2239207898909655
38.0,Other-service,0.1405498367369025
67.0,Sales,0.0904406068865882
23.0,Machine-op-inspct,0.0767183348518177
24.0,Machine-op-inspct,0.1136901629682214
43.0,Handlers-cleaners,0.258078566210322
44.0,Farming-fishing,0.235706139611961


In [0]:
import pyspark.sql.functions as F

errores = spark.sql("""
    SELECT age, SUM(CASE WHEN prediction = label THEN 1 ELSE 0 END) as Errores, count(*) AS count
    FROM finalPredictions
    GROUP BY age
""")

errores = errores.withColumn("rato_errores", (F.col("Errores") / F.col("count")))

display(errores)

age,Errores,count,rato_errores
70.0,21,24,0.875
67.0,38,44,0.8636363636363636
69.0,27,33,0.8181818181818182
88.0,1,2,0.5
49.0,131,170,0.7705882352941177
29.0,196,225,0.8711111111111111
75.0,18,19,0.9473684210526316
64.0,53,65,0.8153846153846154
47.0,149,212,0.7028301886792453
42.0,200,267,0.7490636704119851
