# Data Preprocessing for Avito Using Spark

This notebook contains Data pre processing using spark as the dataset has 14 million records.

Google Cloud is the platform used.

Source of the data is https://www.kaggle.com/c/avito-demand-prediction/data

A couple of notebooks are reffered from Kaggle forums which were helpful in making data wrangling more simpler, cleaner.

https://www.kaggle.com/shivamb/in-depth-analysis-visualisations-avito

https://www.kaggle.com/kabure/extensive-eda-of-deal-probability

Thanks to both of them.



We first import libraries

As Google cloud is used and Google data-store is where the dataset is stored

In [1]:
import findspark
import pyspark
import random
import pandas as pd

In [2]:
from pyspark.sql.functions import *

In [3]:
from datetime import datetime

In [4]:
import pyspark.sql.functions as func

##### SQL context created

In [6]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext('local','example')  # if using locally
sqlContext = SQLContext(sc)

In [7]:
pyspark.__version__

'2.4.4'

##### Data read from datastore to spark dataframe

In [8]:
df_train_label = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').option('mode','FAILFAST').option("multiLine",'true').option('escape','"').load('gs://dataproc-e3bd1f7b-2e29-4da6-a5c4-077c164fd32a-us-central1/avito/test/three_class_model_train_param.csv')

In [9]:
cols = ['region_en','category_name_en','title_len','description_len','weekend','user_type','price','param_1_len','param_2_len','param_3_len','item_seq_number','deal_class_5']


In [10]:
df_train_label = df_train_label.select(*cols)

In [11]:
df_train_label = df_train_label.withColumn("price", func.round(df_train_label["price"]).cast('integer'))

In [12]:
df_train_label.printSchema()

root
 |-- region_en: string (nullable = true)
 |-- category_name_en: string (nullable = true)
 |-- title_len: integer (nullable = true)
 |-- description_len: integer (nullable = true)
 |-- weekend: integer (nullable = true)
 |-- user_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- param_1_len: integer (nullable = true)
 |-- param_2_len: integer (nullable = true)
 |-- param_3_len: integer (nullable = true)
 |-- item_seq_number: integer (nullable = true)
 |-- deal_class_5: string (nullable = true)



In [13]:
df_train_label.count()

1503424

In [14]:
def run_sql(statement):
    try:
        result = sqlContext.sql(statement)
    except Exception as e:
        print(e.desc, '\n', e.stackTrace)
        return
    return result

In [15]:
df_train_unlabel = sqlContext.read.parquet("gs://dataproc-e3bd1f7b-2e29-4da6-a5c4-077c164fd32a-us-central1/avito/neat_data/train_active_model.parquet")

In [16]:
df_train_unlabel = df_train_unlabel.fillna( { 'item_seq_number':0.0 } )

In [17]:
df_train_unlabel = df_train_unlabel.withColumn("price", func.round(df_train_unlabel["price"]).cast('integer'))

In [18]:
df_train_unlabel = df_train_unlabel.withColumn("item_seq_number", func.round(df_train_unlabel["item_seq_number"]).cast('integer'))

In [19]:
df_train_unlabel.printSchema()

root
 |-- region_en: string (nullable = true)
 |-- category_name_en: string (nullable = true)
 |-- title_len: integer (nullable = true)
 |-- description_len: integer (nullable = true)
 |-- weekend: string (nullable = true)
 |-- user_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- param_1_len: integer (nullable = true)
 |-- param_2_len: integer (nullable = true)
 |-- param_3_len: integer (nullable = true)
 |-- item_seq_number: integer (nullable = true)



In [20]:
df_train_unlabel.count()

14004478

In [21]:
df_train_label.registerTempTable("label")
tbls = run_sql('show tables')
tbls.toPandas()

Unnamed: 0,database,tableName,isTemporary
0,,label,True


In [22]:
df_train_unlabel.registerTempTable("unlabel")

In [23]:
tbls = run_sql('show tables')
tbls.toPandas()

Unnamed: 0,database,tableName,isTemporary
0,,label,True
1,,unlabel,True


In [24]:
df = run_sql("SELECT * FROM label LIMIT 5")
df.toPandas()

Unnamed: 0,region_en,category_name_en,title_len,description_len,weekend,user_type,price,param_1_len,param_2_len,param_3_len,item_seq_number,deal_class_5
0,Sverdlovsk oblast,Children's products and toys,3,7,0,Private,400,2,0,0,2,Poor
1,Samara oblast,Furniture and interior,3,7,1,Private,3000,1,0,0,19,Poor
2,Rostov oblast,Audio and video,2,17,1,Private,4000,5,0,0,9,Okay
3,Tatarstan,Children's products and toys,1,3,0,Company,2200,2,0,0,286,Good
4,Volgograd oblast,Cars,3,4,0,Private,40000,2,2,1,3,Poor


In [25]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, Normalizer, StandardScaler

In [26]:
categoricalColumns = ['region_en','user_type','category_name_en']

In [27]:
stages = []

In [28]:
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [29]:
label_stringIdx = StringIndexer(inputCol="deal_class_5", outputCol="label")
stages += [label_stringIdx]

In [30]:
numericCols = ["price","description_len","title_len",'param_1_len','param_2_len','param_3_len','item_seq_number']
assemblerInputs =  numericCols + [c + "classVec" for c in categoricalColumns] 

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="unscaled_features")
stages += [assembler]

In [31]:
scaler = StandardScaler(inputCol="unscaled_features", outputCol="features",
                        withStd=True, withMean=False)
stages += [scaler]

In [32]:
cols = df_train_label.columns

In [33]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)

# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df_train_label)
dataset = pipelineModel.transform(df_train_label)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
dataset.limit(10).toPandas()

Unnamed: 0,label,features,region_en,category_name_en,title_len,description_len,weekend,user_type,price,param_1_len,param_2_len,param_3_len,item_seq_number,deal_class_5
0,0.0,"(9.05548541419178e-05, 0.17379080716567827, 1....",Sverdlovsk oblast,Children's products and toys,3,7,0,Private,400,2,0,0,2,Poor
1,0.0,"(0.0006791614060643836, 0.17379080716567827, 1...",Samara oblast,Furniture and interior,3,7,1,Private,3000,1,0,0,19,Poor
2,2.0,"(0.000905548541419178, 0.422063388830933, 1.08...",Rostov oblast,Audio and video,2,17,1,Private,4000,5,0,0,9,Okay
3,1.0,"(0.0004980516977805479, 0.07448177449957641, 0...",Tatarstan,Children's products and toys,1,3,0,Company,2200,2,0,0,286,Good
4,0.0,"(0.00905548541419178, 0.09930903266610187, 1.6...",Volgograd oblast,Cars,3,4,0,Private,40000,2,2,1,3,Poor
5,1.0,"(0.00029430327596123286, 0.07448177449957641, ...",Tatarstan,Children's products and toys,2,3,0,Private,1300,2,0,0,9,Good
6,0.0,"(0.0024902584889027394, 0.5213724214970349, 2....",Nizhny Novgorod oblast,Repair and construction,5,21,0,Private,11000,3,0,0,125,Poor
7,1.0,"(0.00011319356767739725, 0.09930903266610187, ...",Perm Krai,"Clothing, shoes, accessories",2,4,0,Private,500,2,1,1,61,Good
8,0.0,"(0.00011319356767739725, 0.12413629083262734, ...",Orenburg oblast,"Clothing, shoes, accessories",1,5,0,Private,500,2,3,3,85,Poor
9,0.0,"(9.05548541419178e-05, 0.27309983983178016, 2....",Nizhny Novgorod oblast,Children's clothing and shoes,4,11,0,Company,400,2,1,1,136,Poor


--------------------------- SSL -----------------------

In [34]:
stages = []

categoricalColumns = ['region_en','user_type','category_name_en']

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]
    
numericCols = ["price","description_len","title_len",'param_1_len','param_2_len','param_3_len','item_seq_number']
assemblerInputs =  numericCols + [c + "classVec" for c in categoricalColumns] 

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="unscaled_features")
stages += [assembler]

scaler = StandardScaler(inputCol="unscaled_features", outputCol="features",
                        withStd=True, withMean=False)
stages += [scaler]

cols = df_train_unlabel.columns

In [35]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)

# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df_train_unlabel)
dataset = pipelineModel.transform(df_train_unlabel)

# Keep relevant columns
selectedcols = [ "features"] + cols
dataset = dataset.select(selectedcols)
dataset.limit(10).toPandas()

Unnamed: 0,features,region_en,category_name_en,title_len,description_len,weekend,user_type,price,param_1_len,param_2_len,param_3_len,item_seq_number
0,"(6.932371291189339e-05, 0.45175681492698655, 3...",Udmurtia,"Clothing, shoes, accessories",36,156,0,Private,400,14,16,10,16
1,"(0.0007625608420308272, 0.5762795267337841, 3....",Irkutsk oblast,Desktop computers,41,199,0,Private,4400,0,0,0,115
2,"(0.00010398556936784008, 1.2220601019178738, 3...",Krasnodar Krai,Phones,46,422,0,Shop,600,0,0,0,1367
3,"(0.0006065824879790671, 0.16216911305071313, 1...",Krasnodar Krai,"Clothing, shoes, accessories",22,56,0,Company,3500,14,5,2,144
4,"(0.0002253020669636535, 0.19691963727586592, 1...",Tyumen oblast,"Clothing, shoes, accessories",19,68,0,Company,1300,10,0,0,285
5,"(0.00020797113873568016, 0.48650733915213934, ...",Krasnodar Krai,"Clothing, shoes, accessories",37,168,0,Company,1200,10,0,0,78
6,"(6.932371291189339e-05, 0.3098588410076126, 2....",Tatarstan,Children's clothing and shoes,28,107,0,Private,400,11,5,2,40
7,"(0.0015597835405176013, 0.9874940633980924, 2....",Perm Krai,Laptops,25,341,0,Company,9000,0,0,0,68
8,"(5.199278468392004e-05, 0.07529280248783109, 1...",Tyumen oblast,"Clothing, shoes, accessories",15,26,0,Private,300,10,0,0,58
9,"(0.0017330928227973347, 1.3581663217997224, 4....",Samara oblast,Collecting,48,469,0,Company,10000,6,0,0,26


In [36]:
stages = []

categoricalColumns = ['region_en','user_type','category_name_en']

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol="deal_class_5", outputCol="label")
stages += [label_stringIdx]

numericCols = ["price","description_len","title_len",'param_1_len','param_2_len','param_3_len','item_seq_number']
assemblerInputs =  numericCols + [c + "classVec" for c in categoricalColumns] 

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="unscaled_features")
stages += [assembler]

scaler = StandardScaler(inputCol="unscaled_features", outputCol="features",
                        withStd=True, withMean=False)
stages += [scaler]

cols = df_train_label.columns

In [37]:
# Create a Pipeline.
pipeline_label = Pipeline(stages=stages)

# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModelLabel = pipeline_label.fit(df_train_label)
datasetLabel = pipelineModelLabel.transform(df_train_label)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
datasetLabel = datasetLabel.select(selectedcols)
datasetLabel.limit(10).toPandas()

Unnamed: 0,label,features,region_en,category_name_en,title_len,description_len,weekend,user_type,price,param_1_len,param_2_len,param_3_len,item_seq_number,deal_class_5
0,0.0,"(9.05548541419178e-05, 0.17379080716567827, 1....",Sverdlovsk oblast,Children's products and toys,3,7,0,Private,400,2,0,0,2,Poor
1,0.0,"(0.0006791614060643836, 0.17379080716567827, 1...",Samara oblast,Furniture and interior,3,7,1,Private,3000,1,0,0,19,Poor
2,2.0,"(0.000905548541419178, 0.422063388830933, 1.08...",Rostov oblast,Audio and video,2,17,1,Private,4000,5,0,0,9,Okay
3,1.0,"(0.0004980516977805479, 0.07448177449957641, 0...",Tatarstan,Children's products and toys,1,3,0,Company,2200,2,0,0,286,Good
4,0.0,"(0.00905548541419178, 0.09930903266610187, 1.6...",Volgograd oblast,Cars,3,4,0,Private,40000,2,2,1,3,Poor
5,1.0,"(0.00029430327596123286, 0.07448177449957641, ...",Tatarstan,Children's products and toys,2,3,0,Private,1300,2,0,0,9,Good
6,0.0,"(0.0024902584889027394, 0.5213724214970349, 2....",Nizhny Novgorod oblast,Repair and construction,5,21,0,Private,11000,3,0,0,125,Poor
7,1.0,"(0.00011319356767739725, 0.09930903266610187, ...",Perm Krai,"Clothing, shoes, accessories",2,4,0,Private,500,2,1,1,61,Good
8,0.0,"(0.00011319356767739725, 0.12413629083262734, ...",Orenburg oblast,"Clothing, shoes, accessories",1,5,0,Private,500,2,3,3,85,Poor
9,0.0,"(9.05548541419178e-05, 0.27309983983178016, 2....",Nizhny Novgorod oblast,Children's clothing and shoes,4,11,0,Company,400,2,1,1,136,Poor


In [38]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = datasetLabel.randomSplit([0.8, 0.2], seed=100)
print(trainingData.count())
print(testData.count())

1202921
300503


In [39]:
print(trainingData.groupBy('label').count().orderBy('count'))
print(testData.groupBy('label').count().orderBy('count'))

DataFrame[label: double, count: bigint]
DataFrame[label: double, count: bigint]


In [40]:
trainingData.groupBy('label').count().orderBy('count').show()

+-----+-------+
|label|  count|
+-----+-------+
|  2.0|  66149|
|  1.0| 130270|
|  0.0|1006502|
+-----+-------+



In [41]:
testData.groupBy('label').count().orderBy('count').show()

+-----+------+
|label| count|
+-----+------+
|  2.0| 16517|
|  1.0| 32774|
|  0.0|251212|
+-----+------+



-------------Section 1-------------------------

-----------Without Resampling--------------------------

In [42]:
from pyspark.ml.classification import RandomForestClassifier

In [46]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

In [43]:
from pyspark.ml.classification import DecisionTreeClassifier

In [44]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features",  maxDepth = 30)
dtModel = dt.fit(trainingData)

In [45]:
dtPredictions = dtModel.transform(testData)

In [47]:
predictionAndLabels = dtPredictions.select("prediction","label").rdd

In [48]:
multi_metrics = MulticlassMetrics(predictionAndLabels)

In [49]:
print(multi_metrics.confusionMatrix())

DenseMatrix([[232137.,  13002.,   6073.],
             [ 25748.,   5072.,   1954.],
             [ 12517.,   2009.,   1991.]])


In [53]:
print("Recall = %s" % multi_metrics.recall())


Recall = 0.7959987088315258


In [54]:
print("Precision = %s" % multi_metrics.precision())


Precision = 0.7959987088315258


In [55]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", maxDepth = 28,numTrees=10)
rfModel = rf.fit(trainingData)

In [56]:
rfPredictions = rfModel.transform(testData)

In [57]:
rfPredictionAndLabels = rfPredictions.select("prediction","label").rdd

In [58]:
rf_multi_metrics = MulticlassMetrics(rfPredictionAndLabels)

In [59]:
print(rf_multi_metrics.confusionMatrix())

DenseMatrix([[2.5015e+05, 7.9200e+02, 2.7000e+02],
             [3.1391e+04, 1.2210e+03, 1.6200e+02],
             [1.5758e+04, 5.0700e+02, 2.5200e+02]])


In [61]:
print("Recall = %s" % rf_multi_metrics.recall())
print("Precision = %s" % rf_multi_metrics.precision())
print("F1 measure = %s" % rf_multi_metrics.fMeasure())
print("Accuracy = %s" % rf_multi_metrics.accuracy)

Recall = 0.8373393942822535
Precision = 0.8373393942822535
F1 measure = 0.8373393942822535
Accuracy = 0.8373393942822535


In [63]:
    print("F1 = {}".format(rf_multi_metrics.fMeasure()))
    # Precision
    print("Precision = {}".format(rf_multi_metrics.precision()))
    # Recall
    print("Recall = {}".format(rf_multi_metrics.recall()))

F1 = 0.8373393942822535
Precision = 0.8373393942822535
Recall = 0.8373393942822535


In [64]:
rf30 = RandomForestClassifier(labelCol="label", featuresCol="features", maxDepth = 28,numTrees=30)
rfModel30 = rf30.fit(trainingData)

In [65]:
rf30Predictions = rfModel30.transform(testData)

In [66]:
rf30predictionAndLabels = rf30Predictions.select("prediction","label").rdd

In [67]:
rf30_multi_metrics = MulticlassMetrics(rf30predictionAndLabels)

In [68]:
print(rf30_multi_metrics.confusionMatrix())

DenseMatrix([[2.50452e+05, 6.41000e+02, 1.19000e+02],
             [3.15510e+04, 1.14700e+03, 7.60000e+01],
             [1.59820e+04, 4.51000e+02, 8.40000e+01]])


In [69]:
print("Recall = %s" % rf30_multi_metrics.recall())
print("Precision = %s" % rf30_multi_metrics.precision())
print("F1 measure = %s" % rf30_multi_metrics.fMeasure())
print("Accuracy = %s" % rf30_multi_metrics.accuracy)

Recall = 0.8375390595102212
Precision = 0.8375390595102212
F1 measure = 0.8375390595102212
Accuracy = 0.8375390595102212


------------------------------------------------

--------------------------Section 2--------------------------

------------With Resampling (Under Sampling)----------------------

In [None]:
def resample(base_features,ratio,class_field,base_class):
    pos = base_features.filter(col(class_field)==base_class)
    neg = base_features.filter(col(class_field)!=base_class)
    total_pos = pos.count()
    total_neg = neg.count()
    fraction=float(total_pos*ratio)/float(total_neg)
    sampled = neg.sample(False,fraction)
    return sampled.union(pos)

In [85]:
poor = trainingData.filter(col('label') == 0.0)

In [86]:
poor.count()

1006502

In [87]:
okay = trainingData.filter(col('label') == 2.0)

In [88]:
okay.count()

66149

In [89]:
good = trainingData.filter(col('label') == 1.0)

In [90]:
good.count()

130270

In [91]:
sampled_poor = poor.sample(False,0.12963519528)

In [92]:
sampled_poor.count()

130415

In [93]:
sampled_okay = okay.union(okay)

In [94]:
sampled_okay.count()

132298

In [95]:
sample_df = good.union(sampled_okay)

In [96]:
sample_df = sample_df.union(sampled_poor)

In [97]:
sample_df.count()

392983

In [98]:
train_sample = sample_df

In [75]:
from pyspark.ml.classification import RandomForestClassifier

In [76]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

In [103]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", maxDepth = 28,numTrees=30)
rfModel = rf.fit(train_sample)

In [104]:
rfPredictions = rfModel.transform(testData)

In [105]:
rfPredictionAndLabels = rfPredictions.select("prediction","label").rdd
rf_multi_metrics = MulticlassMetrics(rfPredictionAndLabels)
print(rf_multi_metrics.confusionMatrix())

DenseMatrix([[154547.,  56527.,  40138.],
             [  6584.,  17658.,   8532.],
             [  1954.,   4440.,  10123.]])


In [106]:
print("Recall = %s" % rf_multi_metrics.recall())
print("Precision = %s" % rf_multi_metrics.precision())
print("F1 measure = %s" % rf_multi_metrics.fMeasure())
print("Accuracy = %s" % rf_multi_metrics.accuracy)

Recall = 0.6067426947484718
Precision = 0.6067426947484718
F1 measure = 0.6067426947484718
Accuracy = 0.6067426947484718


----------------------------

------------------Section 3----------------------

--------------Resampling-----------------------

In [41]:
poor = trainingData.filter(col('label') == 0.0)
poor.count()

1006502

In [42]:
okay = trainingData.filter(col('label') == 2.0)
okay.count()

66149

In [44]:
good = trainingData.filter(col('label') == 1.0)
good.count()

130270

In [46]:
sampled_poor = poor.sample(False,0.8)
sampled_poor.count()

805865

In [57]:
sampled_okay = okay.sample(False,0.9)
sampled_okay = okay.union(sampled_okay)
sampled_okay = sampled_okay.sample(False,0.9)
sampled_okay = sampled_okay.union(sampled_okay)
sampled_okay = sampled_okay.sample(False,0.9)
sampled_okay = sampled_okay.union(sampled_okay)
sampled_okay = sampled_okay.sample(True,0.9)
sampled_okay = sampled_okay.union(sampled_okay)
sampled_okay = sampled_okay.sample(True,0.58)
sampled_okay = sampled_okay.union(sampled_okay)
sampled_okay.count()

853300

In [58]:
sampled_okay = sampled_okay.sample(True,0.95)
sampled_okay.count()

811258

In [59]:
sampled_okay = sampled_okay.sample(True,0.99)
sampled_okay.count()

803678

In [55]:
sampled_good = good.sample(False,0.9)
sampled_good = good.union(sampled_good)
sampled_good = sampled_good.sample(False,0.9)
sampled_good = sampled_good.union(sampled_good)
sampled_good = sampled_good.sample(False,0.9)
sampled_good = sampled_good.union(sampled_good)
sampled_good.count()

802078

In [61]:
sample_df = sampled_good.union(sampled_okay)
sample_df = sample_df.union(sampled_poor)
sample_df.count()

2411621

In [62]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

In [63]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", maxDepth = 28,numTrees=25)
rfModel = rf.fit(sample_df)

In [64]:
rfPredictions = rfModel.transform(testData)

In [65]:
rfPredictionAndLabels = rfPredictions.select("prediction","label").rdd
rf_multi_metrics = MulticlassMetrics(rfPredictionAndLabels)
print(rf_multi_metrics.confusionMatrix())

DenseMatrix([[162226.,  53819.,  35167.],
             [  7288.,  17460.,   8026.],
             [  2416.,   4536.,   9565.]])


In [66]:
print("Recall = %s" % rf_multi_metrics.recall())
print("Precision = %s" % rf_multi_metrics.precision())
print("F1 measure = %s" % rf_multi_metrics.fMeasure())
print("Accuracy = %s" % rf_multi_metrics.accuracy)

Recall = 0.6297807343021534
Precision = 0.6297807343021534
F1 measure = 0.6297807343021534
Accuracy = 0.6297807343021534
