# -1) Imports

In [1]:
# Python
## modules
import time  
import sys  
import numpy as np 
import copy
## fonctions
from datetime                  import datetime
from dateutil.relativedelta    import relativedelta

# Spark 1.6
from pyspark                   import SparkContext 
from pyspark                   import SparkConf    
from pyspark.sql               import SQLContext   
from pyspark.sql               import HiveContext  

# Spark 2.0
from pyspark.sql               import SparkSession 

# Fonctions
from pyspark.sql               import Row
from pyspark.sql.types         import *

# Machine learning
from pyspark.ml                import Pipeline
from pyspark.ml.feature        import OneHotEncoder
from pyspark.ml.feature        import StringIndexer
from pyspark.ml.feature        import VectorIndexer
from pyspark.ml.feature        import VectorAssembler
from pyspark.ml.evaluation     import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

# Ancienne librairie de ML
from pyspark.mllib.evaluation  import MulticlassMetrics

# 0) settings

In [2]:
app_name    = "Random forest RF"
nb_cores    = 3
paralelisme = 3
memory      = 3
start_load  = time.time()
spark_1_6   = False
spark_2     = True

In [3]:
assert( spark_1_6 & spark_2 == False)

In [4]:
#spark 1.6
if spark_1_6:
    conf = SparkConf()
    conf.setAppName(app_name)
    conf.set("spark.mesos.coarse"             , "True")
    conf.set("spark.executor.memory"          , "%sg"%memory)
    conf.set("spark.driver.memory"            , "%sg"%memory)
    conf.set("spark.serializer"               , "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryoserializer.buffer.max", "1024m")
    conf.set("spark.driver.maxResultSize"     , "10g")
    conf.set("spark.cores.max"                , "%s"%(nb_cores))
    conf.set("spark.default.parallelism"      , "%s"%(nb_cores*paralelisme))
    conf.set("spark.storage.memoryFraction"   , "0.5")
    sc         = SparkContext(conf=conf)
    sqlContext = HiveContext(sc)


In [5]:
if spark_2:
    spark = SparkSession.builder\
    .config("spark.app.name"                  , app_name                                   )\
    .config("spark.cores.max"                 , "%s"%(nb_cores)                            )\
    .config("spark.mesos.coarse"             , "True"                                      )\
    .config("spark.executor.memory"          , "%sg"%memory                                )\
    .config("spark.driver.memory"            , "%sg"%memory                                )\
    .config("spark.serializer"               , "org.apache.spark.serializer.KryoSerializer")\
    .config("spark.kryoserializer.buffer.max", "1024m"                                     )\
    .config("spark.driver.maxResultSize"     , "10g"                                       )\
    .config("spark.cores.max"                , "%s"%(nb_cores)                             )\
    .config("spark.default.parallelism"      , "%s"%(nb_cores*paralelisme)                 )\
    .config("spark.storage.memoryFraction"   , "0.5"                                       )\
    .getOrCreate()


# 1) Structure et import du fichier

In [6]:
colY = "income"

In [7]:
adultschema = StructType([
    StructField("age"            , DoubleType() ,True),
    StructField("occupation"     , StringType() ,True),
    StructField("capital_gain"   , DoubleType() ,True),
    StructField("education"      , StringType() ,True),
    StructField("marital_status" , StringType() ,True),    
    StructField("workclass"      , StringType() ,True),
    StructField("relationship"   , StringType() ,True),
    StructField("race"           , StringType() ,True),
    StructField("sex"            , StringType() ,True),
    StructField("capital_loss"   , DoubleType() ,True),
    StructField("fnlwgt"         , DoubleType() ,True),
    StructField("hours_per_week" , DoubleType() ,True),
    StructField("native_country" , StringType() ,True),
    StructField("income"         , StringType() ,True),
])

In [8]:
data = spark.read.csv("/Users/romain/Desktop/adult.raw.txt", schema=adultschema, header=False)

In [9]:
print data.take(2)
data.count()

[Row(age=39.0, occupation=u' State-gov', capital_gain=77516.0, education=u' Bachelors', marital_status=u' Never-married', workclass=u' Adm-clerical', relationship=u' Not-in-family', race=u' White', sex=u' Male', capital_loss=2174.0, fnlwgt=0.0, hours_per_week=40.0, native_country=u' United-States', income=u' <=50K'), Row(age=50.0, occupation=u' Self-emp-not-inc', capital_gain=83311.0, education=u' Bachelors', marital_status=u' Married-civ-spouse', workclass=u' Exec-managerial', relationship=u' Husband', race=u' White', sex=u' Male', capital_loss=0.0, fnlwgt=0.0, hours_per_week=13.0, native_country=u' United-States', income=u' <=50K')]


48842

In [10]:
#si on veut vérifier les choses
if False:
    for col in data.columns:
        data.groupby(col).count().show()

# 2) gestion des  colonnes catégorielles / numériques

In [19]:
def indexStringColumns(df, cols):
    """
    Convertit les colonnes de string en numériques.
    (pbm = donne l'idée d'un ordre naturel)
    Parameters:
        df : matrice à modifier 
            dataframe
        cols : noms des colonnes à indexer
            list de chaine de caractère
            
    Return: dataframe
    """
    from pyspark.ml.feature import StringIndexer
    newdf = df
    for col in cols:
        indexer = StringIndexer(inputCol=col, outputCol=col+"-num")
        model   = indexer.fit(newdf)
        newdf   = model.transform(newdf).drop(col)
        newdf   = newdf.withColumnRenamed(col+"-num", col)
    return newdf

In [43]:
def oneHotEncodeColumns(df, cols):
    """
    Convertit une colonne contenant n modalité 
    en n colonne ne comprenant qu'une seule valeur.
    (Supprime les effets d'ordre des valeurs numériques)
    Parameters:
        df : matrice à modifier 
            dataframe
        cols : noms des colonnes à indexer
            list de chaine de caractère
            
    Return: dataframe
    """
    from pyspark.ml.feature import OneHotEncoder
    newdf = df
    for col in cols:
        onehotenc = OneHotEncoder(inputCol=col, outputCol=col+"-onehot", dropLast=False)
        newdf     = onehotenc.transform(newdf).drop(col)
        newdf     = newdf.withColumnRenamed(col+"-onehot", col)
    return newdf

# 2.1) catégories => numériques

In [105]:
colY

'income'

In [44]:
typeString = [x[0] for x in data.dtypes if x[1]=='string']

In [104]:
typeString

['occupation',
 'education',
 'marital_status',
 'workclass',
 'relationship',
 'race',
 'sex',
 'native_country',
 'income']

In [45]:
colString_without_Y = copy.copy(typeString)
colString_without_Y.remove(colY)

In [46]:
data.take(1)

[Row(age=39.0, occupation=u' State-gov', capital_gain=77516.0, education=u' Bachelors', marital_status=u' Never-married', workclass=u' Adm-clerical', relationship=u' Not-in-family', race=u' White', sex=u' Male', capital_loss=2174.0, fnlwgt=0.0, hours_per_week=40.0, native_country=u' United-States', income=u' <=50K')]

In [47]:
data2 = indexStringColumns(data, typeString)
data2.take(1)

[Row(age=39.0, capital_gain=77516.0, capital_loss=2174.0, fnlwgt=0.0, hours_per_week=40.0, occupation=4.0, education=2.0, marital_status=1.0, workclass=3.0, relationship=1.0, race=0.0, sex=0.0, native_country=0.0, income=0.0)]

# 2.2) numériques => one hot encoding

In [48]:
typeDouble = [x[0] for x in data2.dtypes if x[1]=='double']

In [49]:
data3 = oneHotEncodeColumns(data2, colString_without_Y)
data3.take(1)

[Row(age=39.0, capital_gain=77516.0, capital_loss=2174.0, fnlwgt=0.0, hours_per_week=40.0, income=0.0, occupation=SparseVector(9, {4: 1.0}), education=SparseVector(16, {2: 1.0}), marital_status=SparseVector(7, {1: 1.0}), workclass=SparseVector(15, {3: 1.0}), relationship=SparseVector(6, {1: 1.0}), race=SparseVector(5, {0: 1.0}), sex=SparseVector(2, {0: 1.0}), native_country=SparseVector(42, {0: 1.0}))]

# 2.3) plusieurs colonnes => un vecteur

In [50]:
# Choix des colonnes
features = data3.columns
features.remove(colY)
# Assembleur
assemblor = VectorAssembler(inputCols=features, outputCol="features")
# Application
data4 = assemblor.transform(data3)
data4.take(2)

[Row(age=39.0, capital_gain=77516.0, capital_loss=2174.0, fnlwgt=0.0, hours_per_week=40.0, income=0.0, occupation=SparseVector(9, {4: 1.0}), education=SparseVector(16, {2: 1.0}), marital_status=SparseVector(7, {1: 1.0}), workclass=SparseVector(15, {3: 1.0}), relationship=SparseVector(6, {1: 1.0}), race=SparseVector(5, {0: 1.0}), sex=SparseVector(2, {0: 1.0}), native_country=SparseVector(42, {0: 1.0}), features=SparseVector(107, {0: 39.0, 1: 77516.0, 2: 2174.0, 4: 40.0, 9: 1.0, 16: 1.0, 31: 1.0, 40: 1.0, 53: 1.0, 58: 1.0, 63: 1.0, 65: 1.0})),
 Row(age=50.0, capital_gain=83311.0, capital_loss=0.0, fnlwgt=0.0, hours_per_week=13.0, income=0.0, occupation=SparseVector(9, {1: 1.0}), education=SparseVector(16, {2: 1.0}), marital_status=SparseVector(7, {0: 1.0}), workclass=SparseVector(15, {2: 1.0}), relationship=SparseVector(6, {0: 1.0}), race=SparseVector(5, {0: 1.0}), sex=SparseVector(2, {0: 1.0}), native_country=SparseVector(42, {0: 1.0}), features=SparseVector(107, {0: 50.0, 1: 83311.0, 4

In [51]:
if False:
    label = [colY]

# 2.4) projection, et cleaning

In [52]:
data5 = data4.select("features", colY)
data5.schema

StructType(List(StructField(features,VectorUDT,true),StructField(income,DoubleType,true)))

In [53]:
data5.take(2)

[Row(features=SparseVector(107, {0: 39.0, 1: 77516.0, 2: 2174.0, 4: 40.0, 9: 1.0, 16: 1.0, 31: 1.0, 40: 1.0, 53: 1.0, 58: 1.0, 63: 1.0, 65: 1.0}), income=0.0),
 Row(features=SparseVector(107, {0: 50.0, 1: 83311.0, 4: 13.0, 6: 1.0, 16: 1.0, 30: 1.0, 39: 1.0, 52: 1.0, 58: 1.0, 63: 1.0, 65: 1.0}), income=0.0)]

In [54]:
data5.count()

48842

In [55]:
data6 = data5.dropDuplicates()
data6.count()

48790

# 2.5) équilibrage des classes

In [56]:
data6.groupBy(colY).count().show()

+------+-----+
|income|count|
+------+-----+
|   0.0|37109|
|   1.0|11681|
+------+-----+



In [57]:
nb_examples = 10000
income_0             = data6.filter("income == 0")
_10000_income_0      = income_0.sample(False, nb_examples/float(income_0.count()))

income_1             = data6.filter("income == 1")
_10000_income_1      = income_1.sample(False, nb_examples/float(income_1.count()))

classes_equilibrees  = _10000_income_0.union(_10000_income_1)

In [58]:
print _10000_income_0.count(), _10000_income_1.count(), classes_equilibrees.count()

9990 10033 20023


# 3) Apprentissage

In [59]:
# Division en jeu de test, et jeu d'apprentissage
(trainingData, testData) = classes_equilibrees.randomSplit([0.7, 0.3])

# 3.1) variation du nombre d'arbres

In [68]:
nb_max_arbre = 20
for ntree in range(1,nb_max_arbre) :
    rf          = RandomForestClassifier(labelCol=colY, numTrees=ntree)
    model       = rf.fit(trainingData)
    predictions = model.transform(testData)

    evaluator = MulticlassClassificationEvaluator(  labelCol      = "income" , 
                                                    predictionCol = "prediction"   , 
                                                    metricName    = "accuracy"     )
    accuracy  = evaluator.evaluate(predictions)
    error     = 1 - accuracy

    print("%s arbre => Accuracy = %g, Error = %s" % (ntree, accuracy, error))


1 arbre => Accuracy = 0.781745, Error = 0.218254630402
2 arbre => Accuracy = 0.756382, Error = 0.243617553813
3 arbre => Accuracy = 0.786417, Error = 0.213582512932
4 arbre => Accuracy = 0.787419, Error = 0.212581344902
5 arbre => Accuracy = 0.790589, Error = 0.209410979476
6 arbre => Accuracy = 0.78575, Error = 0.214249958285
7 arbre => Accuracy = 0.78041, Error = 0.219589521108
8 arbre => Accuracy = 0.784415, Error = 0.21558484899
9 arbre => Accuracy = 0.789087, Error = 0.21091273152
10 arbre => Accuracy = 0.790589, Error = 0.209410979476
11 arbre => Accuracy = 0.788754, Error = 0.211246454197
12 arbre => Accuracy = 0.786251, Error = 0.21374937427
13 arbre => Accuracy = 0.787085, Error = 0.212915067579
14 arbre => Accuracy = 0.78308, Error = 0.216919739696
15 arbre => Accuracy = 0.784582, Error = 0.215417987652
16 arbre => Accuracy = 0.785917, Error = 0.214083096946
17 arbre => Accuracy = 0.780244, Error = 0.219756382446
18 arbre => Accuracy = 0.780744, Error = 0.219255798432
19 arbr

In [64]:
evaluator.isLargerBetter()

True

# 3.2) variation de la profondeur

In [69]:
test_forets = [1, 10, 20]
test_depth  = [5, 10, 20]
for ntree in test_forets:
    for depth in test_depth:
        rf          = RandomForestClassifier(labelCol=colY, numTrees=ntree, maxDepth=depth)
        model       = rf.fit(trainingData)
        predictions = model.transform(testData)

        evaluator = MulticlassClassificationEvaluator(  labelCol      = "income" , 
                                                        predictionCol = "prediction"   , 
                                                        metricName    = "accuracy"     )
        accuracy  = 1 - evaluator.evaluate(predictions)

        print("%s arbre, depth = %s => Error = %g" % (ntree, depth, accuracy))


1 arbre, depth = 5 => Error = 0.218255
1 arbre, depth = 10 => Error = 0.200234
1 arbre, depth = 20 => Error = 0.218421
10 arbre, depth = 5 => Error = 0.209411
10 arbre, depth = 10 => Error = 0.191557
10 arbre, depth = 20 => Error = 0.175872
20 arbre, depth = 5 => Error = 0.219923
20 arbre, depth = 10 => Error = 0.194894
20 arbre, depth = 20 => Error = 0.178542


In [70]:
from time import time as now

# 3.3) Etendue de la forêt
(on peut aller prendre un café)

In [71]:
test_forets = [ 20, 30,  50]
test_depth  = [ 30] # limite à 30 de profondeurs 
for ntree in test_forets:
    for depth in test_depth:
        debut       = now()
        # modélisation :
        rf          = RandomForestClassifier(labelCol=colY, numTrees=ntree, maxDepth=depth,)
        model       = rf.fit(trainingData)
        predictions = model.transform(testData)
        # mesure de la performance :
        evaluator   = MulticlassClassificationEvaluator(  labelCol      = "income" , 
                                                          predictionCol = "prediction"   , 
                                                          metricName    = "accuracy"     )
        accuracy     = 1 - evaluator.evaluate(predictions)
        duree        = now()-debut
        print("{0} arbre, depth = {1} => Error = {2:3.4}, duree = {3:5} sec ".format (ntree, depth, accuracy, duree))
    

20 arbre, depth = 30 => Error = 0.1765, duree = 3.44e+02 sec 
30 arbre, depth = 30 => Error = 0.1727, duree = 5.49e+02 sec 
50 arbre, depth = 30 => Error = 0.172, duree = 1.23e+03 sec 


Py4JJavaError: An error occurred while calling o4493.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 3238.0 failed 4 times, most recent failure: Lost task 6.3 in stage 3238.0 (TID 290118, 169.254.61.163): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:745)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:744)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:744)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:48)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:44)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy$lzycompute(MulticlassMetrics.scala:168)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy(MulticlassMetrics.scala:168)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:87)
	at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:745)


# Tentative de récupération des features importance

In [None]:
if False:
    from pyspark.ml.feature import OneHotEncoder

    newdf       = copy.copy(data)
    inputCol_1  = newdf.columns[1]
    outputCol_1 = col+"-num"
    indexer     = StringIndexer(inputCol=inputCol_1, outputCol=outputCol_1)
    model       = indexer.fit(newdf)
    newdf_1     = model.transform(newdf)
    newdf_1.select(inputCol_1, outputCol_1).dropDuplicates().show()

    inputCol_2   = outputCol_1
    outputCol_2  = col+"-onehot"
    onehotenc    = OneHotEncoder(inputCol=inputCol_2, outputCol=outputCol_2, dropLast=False)
    newdf_2      = onehotenc.transform(newdf_1)
    newdf_2.select(inputCol_1, outputCol_1, outputCol_2).dropDuplicates().show()

# 4) changement des classifieurs

In [101]:

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LogisticRegression

classifiers = { "NaiveBayes"         : NaiveBayes(labelCol=colY)         , 
                "GBTClassifier"      : GBTClassifier(labelCol=colY)      , 
                "LogisticRegression" : LogisticRegression(labelCol=colY) }
best_accuracy   = 0
best_classifier = ""
for classifierName,classifier in classifiers.iteritems():
    debut       = now()
    model       = classifier.fit(trainingData)
    predictions = model.transform(testData)

    evaluator   = MulticlassClassificationEvaluator(  labelCol      = "income" , 
                                                      predictionCol = "prediction"   , 
                                                      metricName    = "accuracy"     )
    accuracy    = evaluator.evaluate(predictions)
    error       = 1 - accuracy
    duree       = now() - debut
    print("{0:20} => Accuracy = {1:4.3}, Error = {2:4.3}, duree = {3:5.3} sec".format (classifierName ,  
                                                                                       accuracy       , 
                                                                                       error          , 
                                                                                       duree         ))
    if accuracy > best_accuracy:
        best_accuracy   = accuracy
        best_classifier = classifierName
        
print "best_classifier = %s, best_accuracy = %s"%(best_classifier, best_accuracy)


GBTClassifier        => Accuracy = 0.825, Error = 0.175, duree = 1.09e+02 sec
LogisticRegression   => Accuracy = 0.819, Error = 0.181, duree =  25.5 sec
NaiveBayes           => Accuracy = 0.59, Error = 0.41, duree =  2.72 sec
best_classifier = GBTClassifier, best_accuracy = 0.825129317537
