## Notebook looking at the need to rebalance data set for building models

In [1]:
import data_engineering as de
df = de.main()
df.show(n=3)

+------+-----------+------+-------------+-------------+----------------+-------------------+-----------------+-----------------------+------------------+--------------------+-----+----------+---------------+------+-------------------------+---------+-----+----+----+------+--------+-----------+-------------+--------+----------------+--------------+-----------+---------+-------+-----------------+----------------+----------------------+-------------+
|UserId|UserTimeBin|Gender|PaidInTimeBin|FreeInTimeBin|ChurnedInTimeBin|DowngradedInTimeBin|UpgradedInTimeBin|DaysRegisteredAtTimeBin|WillChurnInNextBin|PreviouslyDowngraded|About|Add Friend|Add to Playlist|Cancel|Cancellation Confirmation|Downgrade|Error|Help|Home|Logout|NextSong|Roll Advert|Save Settings|Settings|Submit Downgrade|Submit Upgrade|Thumbs Down|Thumbs Up|Upgrade|SessionsInTimeBin|ArtistsInTimeBin|DistinctSongsInTimeBin|WillChurnSoon|
+------+-----------+------+-------------+-------------+----------------+-------------------+----

In [16]:
df.printSchema()

root
 |-- UserId: integer (nullable = true)
 |-- UserTimeBin: integer (nullable = true)
 |-- Gender: integer (nullable = true)
 |-- PaidInTimeBin: integer (nullable = true)
 |-- FreeInTimeBin: integer (nullable = true)
 |-- ChurnedInTimeBin: integer (nullable = true)
 |-- DowngradedInTimeBin: integer (nullable = true)
 |-- UpgradedInTimeBin: integer (nullable = true)
 |-- DaysRegisteredAtTimeBin: integer (nullable = true)
 |-- WillChurnInNextBin: integer (nullable = true)
 |-- PreviouslyDowngraded: integer (nullable = true)
 |-- About: integer (nullable = true)
 |-- Add Friend: integer (nullable = true)
 |-- Add to Playlist: integer (nullable = true)
 |-- Cancel: integer (nullable = true)
 |-- Cancellation Confirmation: integer (nullable = true)
 |-- Downgrade: integer (nullable = true)
 |-- Error: integer (nullable = true)
 |-- Help: integer (nullable = true)
 |-- Home: integer (nullable = true)
 |-- Logout: integer (nullable = true)
 |-- NextSong: integer (nullable = true)
 |-- Roll 

### Look at distribution of labels we can use to make predictions on

In [2]:
df.groupBy('ChurnedInTimeBin').count().show()
df.groupBy('WillChurnInNextBin').count().show()
df.groupBy('WillChurnSoon').count().show()

+----------------+-----+
|ChurnedInTimeBin|count|
+----------------+-----+
|               1|   52|
|               0|  809|
+----------------+-----+

+------------------+-----+
|WillChurnInNextBin|count|
+------------------+-----+
|                 1|   39|
|                 0|  822|
+------------------+-----+

+-------------+-----+
|WillChurnSoon|count|
+-------------+-----+
|            1|   91|
|            0|  770|
+-------------+-----+



In [3]:
from pyspark.ml.feature import VectorAssembler

sel_cols = ["UserTimeBin", "Gender", "PaidInTimeBin", "FreeInTimeBin", "DowngradedInTimeBin", "UpgradedInTimeBin", 
            "PreviouslyDowngraded", "About", "Add Friend", "Add to Playlist", "Downgrade", "Error", "Help", "Home", "Logout", 
            "NextSong", "Roll Advert", "Save Settings", "Settings", "Thumbs Down", "Thumbs Up", "Upgrade", "SessionsInTimeBin", 
            "ArtistsInTimeBin", "DistinctSongsInTimeBin"]
assembler = VectorAssembler(inputCols = sel_cols,
                            outputCol = "features")

dataset = assembler.transform(df)
dataset = dataset.withColumnRenamed("ChurnedInTimeBin", "label")
dataset.show(n=10)

+------+-----------+------+-------------+-------------+-----+-------------------+-----------------+-----------------------+------------------+--------------------+-----+----------+---------------+------+-------------------------+---------+-----+----+----+------+--------+-----------+-------------+--------+----------------+--------------+-----------+---------+-------+-----------------+----------------+----------------------+-------------+--------------------+
|UserId|UserTimeBin|Gender|PaidInTimeBin|FreeInTimeBin|label|DowngradedInTimeBin|UpgradedInTimeBin|DaysRegisteredAtTimeBin|WillChurnInNextBin|PreviouslyDowngraded|About|Add Friend|Add to Playlist|Cancel|Cancellation Confirmation|Downgrade|Error|Help|Home|Logout|NextSong|Roll Advert|Save Settings|Settings|Submit Downgrade|Submit Upgrade|Thumbs Down|Thumbs Up|Upgrade|SessionsInTimeBin|ArtistsInTimeBin|DistinctSongsInTimeBin|WillChurnSoon|            features|
+------+-----------+------+-------------+-------------+-----+---------------

In [4]:
dataset.select('features').take(10) # Does it matter that this is a mix of dense and sparse vectors? It doesn't appear to impact anything in pyspark

[Row(features=DenseVector([4.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 5.0, 2.0, 1.0, 0.0, 0.0, 8.0, 0.0, 88.0, 1.0, 1.0, 1.0, 2.0, 5.0, 4.0, 3.0, 85.0, 87.0])),
 Row(features=DenseVector([7.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 9.0, 19.0, 3.0, 0.0, 5.0, 15.0, 4.0, 470.0, 0.0, 0.0, 2.0, 4.0, 29.0, 0.0, 5.0, 390.0, 445.0])),
 Row(features=DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 4.0, 2.0, 0.0, 0.0, 2.0, 4.0, 1.0, 120.0, 2.0, 0.0, 1.0, 1.0, 6.0, 1.0, 2.0, 112.0, 119.0])),
 Row(features=DenseVector([5.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 8.0, 11.0, 5.0, 0.0, 7.0, 11.0, 5.0, 363.0, 0.0, 0.0, 2.0, 2.0, 14.0, 0.0, 3.0, 323.0, 360.0])),
 Row(features=SparseVector(25, {0: 1.0, 1: 1.0, 3: 1.0, 14: 1.0, 15: 12.0, 16: 2.0, 20: 1.0, 22: 1.0, 23: 12.0, 24: 12.0})),
 Row(features=SparseVector(25, {0: 5.0, 1: 1.0, 3: 1.0, 8: 1.0, 12: 1.0, 13: 3.0, 14: 1.0, 15: 32.0, 16: 1.0, 19: 1.0, 20: 1.0, 22: 1.0, 23: 32.0, 24: 32.0})),
 Row(features=DenseVector([4.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1

## Build RF models predicting the above labels and compare AUCs with/without sampling

In [5]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 147309)

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(trainingData)

predictions = rfModel.transform(testData)
selected = predictions.select("label", "prediction", "probability", "UserId", "UserTimeBin")

selected.show(n=10)

+-----+----------+--------------------+------+-----------+
|label|prediction|         probability|UserId|UserTimeBin|
+-----+----------+--------------------+------+-----------+
|    0|       0.0|[0.91583971938762...|   113|          4|
|    0|       0.0|[0.96450387420027...|    99|          8|
|    0|       0.0|[0.93657567928760...|   109|          5|
|    0|       0.0|[0.97050172633964...|    93|          4|
|    0|       0.0|[0.95368187592655...|100012|          1|
|    0|       0.0|[0.96370090304532...|     4|          0|
|    0|       0.0|[0.98749979136031...|   140|          5|
|    0|       0.0|[0.95461992073648...|   102|          1|
|    0|       0.0|[0.96949190205362...|    86|          9|
|    0|       0.0|[0.95297990135016...|    30|          1|
+-----+----------+--------------------+------+-----------+
only showing top 10 rows



In [6]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})

0.09157566283033391

In [7]:
evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

0.6203431372549021

In [8]:
# Sample to deal with class imbalance problem
stratified_data = dataset.sampleBy('label', fractions={0: 0.065, 1: 1.0}).cache()
stratified_data.groupby('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|   52|
|    0|   44|
+-----+-----+



In [9]:
(trainingData, testData) = stratified_data.randomSplit([0.8, 0.2], seed = 147309)

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(trainingData)

predictions = rfModel.transform(testData)
selected = predictions.select("label", "prediction", "probability", "UserId", "UserTimeBin")

selected.show(n=10)

+-----+----------+--------------------+------+-----------+
|label|prediction|         probability|UserId|UserTimeBin|
+-----+----------+--------------------+------+-----------+
|    0|       1.0|[0.44101446492575...|   143|          3|
|    1|       0.0|[0.58067460317460...|300001|         13|
|    1|       1.0|[0.36041264291264...|200017|          4|
|    1|       1.0|[0.16587301587301...|300007|          0|
|    1|       1.0|[0.42179181929181...|200011|          7|
|    1|       0.0|[0.56007591093117...|200016|          3|
|    1|       0.0|[0.61805153180153...|   125|          5|
|    0|       0.0|[0.51407871347913...|    60|          4|
|    1|       1.0|[0.28493284493284...|100022|          3|
|    0|       0.0|[0.71807795698924...|    65|          2|
+-----+----------+--------------------+------+-----------+
only showing top 10 rows



In [10]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})

0.612342595675929

### Improved by rebalancing

In [12]:
evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

0.5079365079365079

In [13]:
# Now look at making predictions for the nextbin
dataset = dataset.withColumnRenamed("label", "ChurnedInTimeBin")
dataset = dataset.withColumnRenamed("WillChurnInNextBin", "label")
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 147309)

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(trainingData)

predictions = rfModel.transform(testData)
#selected = predictions.select("label", "prediction", "probability", "UserId", "UserTimeBin")
#selected.show(n=10)

print("Imbalanced AUC PR: ", evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"}))
print("Imbalanced AUC ROC: ", evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}))

print("Rebalncing, and re-running...")

stratified_data = dataset.sampleBy('label', fractions={0: 0.05, 1: 1.0}).cache()
stratified_data.groupby('label').count().show()
(trainingData, testData) = stratified_data.randomSplit([0.8, 0.2], seed = 147309)

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(trainingData)

predictions = rfModel.transform(testData)

print("Balanced AUC PR: ", evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"}))
print("Balanced AUC ROC: ", evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}))


Imbalanced AUC PR:  0.07208510161588808
Imbalanced AUC ROC:  0.5563530037214244
Rebalncing, and re-running...
+-----+-----+
|label|count|
+-----+-----+
|    1|   39|
|    0|   38|
+-----+-----+

Balanced AUC PR:  0.7719561688311689
Balanced AUC ROC:  0.78125


In [14]:
# Now look at making predictions for the nextbin
dataset = dataset.withColumnRenamed("label", "WillChurnInNextBin")
dataset = dataset.withColumnRenamed("WillChurnSoon", "label")
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 147309)

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(trainingData)

predictions = rfModel.transform(testData)
#selected = predictions.select("label", "prediction", "probability", "UserId", "UserTimeBin")
#selected.show(n=10)

print("Imbalanced AUC PR: ", evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"}))
print("Imbalanced AUC ROC: ", evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}))

print("Rebalncing, and re-running...")

stratified_data = dataset.sampleBy('label', fractions={0: 0.12, 1: 1.0}).cache()
stratified_data.groupby('label').count().show()
(trainingData, testData) = stratified_data.randomSplit([0.8, 0.2], seed = 147309)

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(trainingData)

predictions = rfModel.transform(testData)

print("Balanced AUC PR: ", evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"}))
print("Balanced AUC ROC: ", evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}))

Imbalanced AUC PR:  0.135615190280366
Imbalanced AUC ROC:  0.5254306808859721
Rebalncing, and re-running...
+-----+-----+
|label|count|
+-----+-----+
|    1|   91|
|    0|   96|
+-----+-----+

Balanced AUC PR:  0.6538189824124645
Balanced AUC ROC:  0.6518518518518517
