In [None]:
# %load pyspark_init_mac.py
#
# This configuration works for Spark on macOS using homebrew
#
import os, sys
# set OS environment variable
os.environ["SPARK_HOME"] = '/usr/local/Cellar/apache-spark/2.2.0/libexec'
# add Spark library to Python
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], 'python'))

# import package
import pyspark
from pyspark.context import SparkContext, SparkConf

import atexit
def stop_my_spark():
    sc.stop()
    del(sc)

# Register exit    
atexit.register(stop_my_spark)

# Configure and start Spark ... but only once.
if not 'sc' in globals():
    conf = SparkConf()
    conf.setAppName('MyFirstSpark') ## you may want to change this
    conf.setMaster('local[2]')
    sc = SparkContext(conf=conf)
    print "Launched Spark version %s with ID %s" % (sc.version, sc.applicationId)



In [2]:
print sc.version

1.6.1


Let's see if the data file exists

In [3]:
%%sh
hdfs dfs -ls /data/apache-spark/mllib

Found 18 items
drwxr-xr-x   - pmolnar hdfs          0 2017-10-27 10:39 /data/apache-spark/mllib/als
-rw-r--r--   3 pmolnar hdfs      63973 2017-10-27 10:39 /data/apache-spark/mllib/gmm_data.txt
-rw-r--r--   3 pmolnar hdfs         72 2017-10-27 10:39 /data/apache-spark/mllib/kmeans_data.txt
-rw-r--r--   3 pmolnar hdfs         24 2017-10-27 10:39 /data/apache-spark/mllib/pagerank_data.txt
-rw-r--r--   3 pmolnar hdfs        164 2017-10-27 10:39 /data/apache-spark/mllib/pic_data.txt
drwxr-xr-x   - pmolnar hdfs          0 2017-10-27 10:39 /data/apache-spark/mllib/ridge-data
-rw-r--r--   3 pmolnar hdfs     104736 2017-10-27 10:39 /data/apache-spark/mllib/sample_binary_classification_data.txt
-rw-r--r--   3 pmolnar hdfs         68 2017-10-27 10:39 /data/apache-spark/mllib/sample_fpgrowth.txt
-rw-r--r--   3 pmolnar hdfs       1798 2017-10-27 10:39 /data/apache-spark/mllib/sample_isotonic_regression_libsvm_data.txt
-rw-r--r--   3 pmolnar hdfs        120 2017-10-27 10:39 /data/apache-spark/mllib

Now, we need to import a couple of modules

# Build the Model

In [6]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

## LIBSVM data format
When I looked into LIBSVM data file for the first time, I got a little bit confused. But then I found its design is a brilliant idea.

LIBSVM data files look like below:
```
-1 1:-766 2:128 3:0.140625 4:0.304688 5:0.234375 6:0.140625 7:0.304688 8:0.234375
-1 1:-726 2:131 3:0.129771 4:0.328244 5:0.229008 6:0.129771 7:0.328244 8:0.229008
......
```
The first element of each row is the *label*, or we can say it's the *response value*. The labels can be either discrete or continuous. Normally, the labels will be discrete if we're working on classification, and continuous if we're trying to do regression. Following the labels are the *feature indices* and the *feature values* in format `index:value` (Please note that the index starts from `1` instead of `0` in LIBSVM data files, i.e., the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based [4]).

Sometimes we may find 'weird' LIBSVM data like below
```
-1 3:1 11:1 14:1 19:1 39:1 42:1 55:1 64:1 67:1 73:1 75:1 76:1 80:1 83:1 
-1 3:1 6:1 17:1 27:1 35:1 40:1 57:1 63:1 69:1 73:1 74:1 76:1 81:1 103:1 
-1 1:1 7:1 16:1 22:1 36:1 42:1 56:1 62:1 67:1 73:1 74:1 76:1 79:1 83:1 
```
The indices in it are not continuous. What's wrong? Actually the missing features are all 0. For example, in the first row, feature 1, 2, 4-10, 12-13, ... are all zero-values. This design is partially for the sake of memory usage. It would help improve the efficiency of the our programs if the data are sparse (containing quite many zero-values).


## Data Type "Labeled Point"

The data loaded by method `loadLibSVMFile` will be saved as `Labeled Points`. What is it?

MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. A training example used in supervised learning is called a “labeled point” in MLlib [4].



In [7]:
data = MLUtils.loadLibSVMFile(sc, '/data/apache-spark/mllib/sample_libsvm_data.txt')

In [71]:
print type(data)
f = data.take(1)[0]

<class 'pyspark.rdd.PipelinedRDD'>


In [78]:
print type(f)
f

<class 'pyspark.mllib.regression.LabeledPoint'>


LabeledPoint(0.0, (692,[127,128,129,130,131,154,155,156,157,158,159,181,182,183,184,185,186,187,188,189,207,208,209,210,211,212,213,214,215,216,217,235,236,237,238,239,240,241,242,243,244,245,262,263,264,265,266,267,268,269,270,271,272,273,289,290,291,292,293,294,295,296,297,300,301,302,316,317,318,319,320,321,328,329,330,343,344,345,346,347,348,349,356,357,358,371,372,373,374,384,385,386,399,400,401,412,413,414,426,427,428,429,440,441,442,454,455,456,457,466,467,468,469,470,482,483,484,493,494,495,496,497,510,511,512,520,521,522,523,538,539,540,547,548,549,550,566,567,568,569,570,571,572,573,574,575,576,577,578,594,595,596,597,598,599,600,601,602,603,604,622,623,624,625,626,627,628,629,630,651,652,653,654,655,656,657],[51.0,159.0,253.0,159.0,50.0,48.0,238.0,252.0,252.0,252.0,237.0,54.0,227.0,253.0,252.0,239.0,233.0,252.0,57.0,6.0,10.0,60.0,224.0,252.0,253.0,252.0,202.0,84.0,252.0,253.0,122.0,163.0,252.0,252.0,252.0,253.0,252.0,252.0,96.0,189.0,253.0,167.0,51.0,238.0,253.0,253.0,190.0,

In [74]:
f.label, type(f.label)

(0.0, float)

In [77]:
type(f.features), f.features.toArray()

(pyspark.mllib.linalg.SparseVector,
 array([   0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
           0.,    0.,    0.,    0.,    0.

In [12]:
data.sample(False, 0.1).count()

9

In [9]:
(trainingData, validationData, testData) = data.randomSplit([70, 10, 20])
print trainingData.count(), validationData.count(), testData.count()

 70 9 21


## How many trees we should have (`numTrees`)

This argument determines how many trees we build in a random forest. Increasing the number of trees will decrease the variance in predictions, and improve the model’s test accuracy. At the same time, training time will increaseroughly linearly in the number of trees.

Personally, I would recommend 400-500 as a 'safe' choice.

## How many features to use (`featureSubsetStrategy`)

As we mentioned above, the very unique charactristic of *random forest* is that in each split of the tree model we use a subset of features (predictors) instead of using all of them. Then, how many features should we use in each split? we can set `featureSubsetStrategy="auto"` of course so that the function we called will help us configure automatically, but we may want to tune it in some situations. Decreasing this number will speed up training, but can sometimes impact performance if too low [2].

For the function `RandomForest.trainClassifier` in PySaprk , argument `featureSubsetStrategy` supports“auto” (default), “all”, “sqrt”, “log2”, “onethird”. If “auto” is set, this parameter is set based on numTrees: if numTrees == 1, set to “all”; if numTrees > 1 (forest) set to “sqrt” [3].

Usually, given the number of features is `p`, we use `p/3` features in each model when building a random forest for regression, and use `sqrt(p)` features in each model if a random forest is built for classification [1].

## What is 'gini' --- the measures used to grow the trees (`impurity`)

`impurity` argument helps determine the criterion used for information gain calculation, and in PySpark the supported values are “gini” (recommended) or “entropy” [3]. Since random forest is some kind of *greedy algorithm*, we can say that `impurity` helps determine what is the objective function when the algorithm makes each decisions.

The most commonly used measures for this are just **Gini Index** and *Cross-entropy*, corresponding to the two supported values for `impurity` argument.


In [81]:
# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=3, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)

In [86]:
f.features.size

692

In [84]:
print model.toDebugString()

TreeEnsembleModel classifier with 3 trees

  Tree 0:
    If (feature 433 <= 0.0)
     If (feature 574 <= 253.0)
      Predict: 0.0
     Else (feature 574 > 253.0)
      Predict: 1.0
    Else (feature 433 > 0.0)
     Predict: 1.0
  Tree 1:
    If (feature 346 <= 4.0)
     If (feature 568 <= 47.0)
      If (feature 416 <= 0.0)
       Predict: 1.0
      Else (feature 416 > 0.0)
       Predict: 0.0
     Else (feature 568 > 47.0)
      Predict: 0.0
    Else (feature 346 > 4.0)
     Predict: 0.0
  Tree 2:
    If (feature 605 <= 0.0)
     If (feature 379 <= 0.0)
      Predict: 0.0
     Else (feature 379 > 0.0)
      Predict: 1.0
    Else (feature 605 > 0.0)
     If (feature 243 <= 0.0)
      Predict: 1.0
     Else (feature 243 > 0.0)
      If (feature 154 <= 0.0)
       If (feature 624 <= 253.0)
        Predict: 0.0
       Else (feature 624 > 253.0)
        Predict: 1.0
      Else (feature 154 > 0.0)
       Predict: 0.0



# Evaluation
Let's apply our model on the test set, and see how well it performs

In [87]:
predictions = model.predict(testData.map(lambda x: x.features))
predictions.take(4)

[1.0, 0.0, 0.0, 1.0]

In [88]:
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
labelsAndPredictions.take(4)

[(1.0, 1.0), (1.0, 0.0), (0.0, 0.0), (1.0, 1.0)]

In [10]:
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print 'Test Error = ' + str(testErr)
print 'Learned classification forest model:'
print model.toDebugString() 

Test Error = 0.0967741935484
Learned classification forest model:
TreeEnsembleModel classifier with 3 trees

  Tree 0:
    If (feature 456 <= 0.0)
     If (feature 213 <= 254.0)
      Predict: 1.0
     Else (feature 213 > 254.0)
      Predict: 0.0
    Else (feature 456 > 0.0)
     Predict: 0.0
  Tree 1:
    If (feature 540 <= 65.0)
     Predict: 1.0
    Else (feature 540 > 65.0)
     Predict: 0.0
  Tree 2:
    If (feature 385 <= 0.0)
     If (feature 439 <= 0.0)
      Predict: 1.0
     Else (feature 439 > 0.0)
      Predict: 0.0
    Else (feature 385 > 0.0)
     Predict: 0.0



In [15]:
p = testData.take(1)[0]

In [16]:
type(p)

pyspark.mllib.regression.LabeledPoint

In [20]:
p.features.toArray()

array([   0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,    0.,
          0.,    0.,

# World Bank Data Example
Source: https://archive.ics.uci.edu/ml/datasets/bank+marketing

Let's see where out data file is stored

In [29]:
%%sh
hdfs dfs -ls -R /data/apache-spark/archive.ics.uci.edu

drwxr-xr-x   - pmolnar hdfs          0 2017-10-27 11:45 /data/apache-spark/archive.ics.uci.edu/BankMarketing
-rw-r--r--   3 pmolnar hdfs        271 2017-10-27 11:45 /data/apache-spark/archive.ics.uci.edu/BankMarketing/README.txt
-rw-r--r--   3 pmolnar hdfs    5834924 2017-10-27 11:45 /data/apache-spark/archive.ics.uci.edu/BankMarketing/bank-additional-full.csv
-rw-r--r--   3 pmolnar hdfs       5458 2017-10-27 11:45 /data/apache-spark/archive.ics.uci.edu/BankMarketing/bank-additional-names.txt
-rw-r--r--   3 pmolnar hdfs     583898 2017-10-27 11:45 /data/apache-spark/archive.ics.uci.edu/BankMarketing/bank-additional.csv
-rw-r--r--   3 pmolnar hdfs    4610348 2017-10-27 11:45 /data/apache-spark/archive.ics.uci.edu/BankMarketing/bank-full.csv
-rw-r--r--   3 pmolnar hdfs       3864 2017-10-27 11:45 /data/apache-spark/archive.ics.uci.edu/BankMarketing/bank-names.txt
-rw-r--r--   3 pmolnar hdfs     461474 2017-10-27 11:45 /data/apache-spark/archive.ics.uci.edu/BankMarketing/bank.csv


In [13]:
%%sh
hdfs dfs -cat /data/apache-spark/archive.ics.uci.edu/BankMarketing/bank-full.csv | head

"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"
58;"management";"married";"tertiary";"no";2143;"yes";"no";"unknown";5;"may";261;1;-1;0;"unknown";"no"
44;"technician";"single";"secondary";"no";29;"yes";"no";"unknown";5;"may";151;1;-1;0;"unknown";"no"
33;"entrepreneur";"married";"secondary";"no";2;"yes";"yes";"unknown";5;"may";76;1;-1;0;"unknown";"no"
47;"blue-collar";"married";"unknown";"no";1506;"yes";"no";"unknown";5;"may";92;1;-1;0;"unknown";"no"
33;"unknown";"single";"unknown";"no";1;"no";"no";"unknown";5;"may";198;1;-1;0;"unknown";"no"
35;"management";"married";"tertiary";"no";231;"yes";"no";"unknown";5;"may";139;1;-1;0;"unknown";"no"
28;"management";"single";"tertiary";"no";447;"yes";"yes";"unknown";5;"may";217;1;-1;0;"unknown";"no"
42;"entrepreneur";"divorced";"tertiary";"yes";2;"yes";"no";"unknown";5;"may";380;1;-1;0;"unknown";"no"
58;"retired";"married";"primary";"no";121;"yes

cat: Unable to write to output stream.


We're going to load the data set as DataFrame (not RDD) and then create LabeledPoint objects

In [21]:
from pyspark.sql import SQLContext, HiveContext, Row
import pyspark.sql.functions as F

In [14]:
DATAFILE = '/data/apache-spark/archive.ics.uci.edu/BankMarketing/bank-additional-full.csv'

In [20]:
rdd = sc.textFile(DATAFILE).map(lambda r: r.replace('"', ''))

In [21]:
rdd.take(10)

[u'age;job;marital;education;default;housing;loan;contact;month;day_of_week;duration;campaign;pdays;previous;poutcome;emp.var.rate;cons.price.idx;cons.conf.idx;euribor3m;nr.employed;y',
 u'56;housemaid;married;basic.4y;no;no;no;telephone;may;mon;261;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'57;services;married;high.school;unknown;no;no;telephone;may;mon;149;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'37;services;married;high.school;no;yes;no;telephone;may;mon;226;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'40;admin.;married;basic.6y;no;no;no;telephone;may;mon;151;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'56;services;married;high.school;no;no;yes;telephone;may;mon;307;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'45;services;married;basic.9y;unknown;no;no;telephone;may;mon;198;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'59;admin.;married;professional.course;no;no;no;telephone;may;mon;139;1;999;0;nonexistent;1.1;93

Take the top row and use for column names

In [22]:
top_row = rdd.take(1)[0]
rdd2 = rdd.filter(lambda row: row!=top_row)
rdd2.take(4)

[u'56;housemaid;married;basic.4y;no;no;no;telephone;may;mon;261;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'57;services;married;high.school;unknown;no;no;telephone;may;mon;149;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'37;services;married;high.school;no;yes;no;telephone;may;mon;226;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no',
 u'40;admin.;married;basic.6y;no;no;no;telephone;may;mon;151;1;999;0;nonexistent;1.1;93.994;-36.4;4.857;5191;no']

In [23]:
rdd3 = rdd2.map(lambda r: tuple(r.split(';')))
rdd3.take(1)

[(u'56',
  u'housemaid',
  u'married',
  u'basic.4y',
  u'no',
  u'no',
  u'no',
  u'telephone',
  u'may',
  u'mon',
  u'261',
  u'1',
  u'999',
  u'0',
  u'nonexistent',
  u'1.1',
  u'93.994',
  u'-36.4',
  u'4.857',
  u'5191',
  u'no')]

In [24]:
columns = top_row.split(';')
columns

[u'age',
 u'job',
 u'marital',
 u'education',
 u'default',
 u'housing',
 u'loan',
 u'contact',
 u'month',
 u'day_of_week',
 u'duration',
 u'campaign',
 u'pdays',
 u'previous',
 u'poutcome',
 u'emp.var.rate',
 u'cons.price.idx',
 u'cons.conf.idx',
 u'euribor3m',
 u'nr.employed',
 u'y']

## Creating  a Table (DataFrame)

In [26]:
import pyspark.sql.types as T

In [27]:
schema = T.StructType()

In [28]:
import re
float_patt = re.compile(r'^-{0,1}\d+\.')
int_patt = re.compile(r'^-{0,1}\d+')
two_rows = rdd.map(lambda r: r.split(';')).take(2)
for i in range(len(two_rows[0])):
    print i, two_rows[0][i], two_rows[1][i]
    if float_patt.match(two_rows[1][i]):
        schema.add(T.StructField(name=two_rows[0][i], dataType=T.FloatType(), nullable=True))
    elif int_patt.match(two_rows[1][i]):
        schema.add(T.StructField(name=two_rows[0][i], dataType=T.IntegerType(), nullable=True))
    else:
        schema.add(T.StructField(name=two_rows[0][i], dataType=T.StringType(), nullable=True))

0 age 56
1 job housemaid
2 marital married
3 education basic.4y
4 default no
5 housing no
6 loan no
7 contact telephone
8 month may
9 day_of_week mon
10 duration 261
11 campaign 1
12 pdays 999
13 previous 0
14 poutcome nonexistent
15 emp.var.rate 1.1
16 cons.price.idx 93.994
17 cons.conf.idx -36.4
18 euribor3m 4.857
19 nr.employed 5191
20 y no


In [29]:
schema

StructType(List(StructField(age,IntegerType,true),StructField(job,StringType,true),StructField(marital,StringType,true),StructField(education,StringType,true),StructField(default,StringType,true),StructField(housing,StringType,true),StructField(loan,StringType,true),StructField(contact,StringType,true),StructField(month,StringType,true),StructField(day_of_week,StringType,true),StructField(duration,IntegerType,true),StructField(campaign,IntegerType,true),StructField(pdays,IntegerType,true),StructField(previous,IntegerType,true),StructField(poutcome,StringType,true),StructField(emp.var.rate,FloatType,true),StructField(cons.price.idx,FloatType,true),StructField(cons.conf.idx,FloatType,true),StructField(euribor3m,FloatType,true),StructField(nr.employed,IntegerType,true),StructField(y,StringType,true)))

In [30]:
df = sqlCtx.createDataFrame(rdd2.map(lambda r: tuple(r.split(';'))), schema=schema)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp.var.rate: float (nullable = true)
 |-- cons.price.idx: float (nullable = true)
 |-- cons.conf.idx: float (nullable = true)
 |-- euribor3m: float (nullable = true)
 |-- nr.employed: integer (nullable = true)
 |-- y: string (nullable = true)



In [32]:
df.limit(10).toPandas()


Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,...,campaign,pdays,previous,poutcome,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,y
0,,admin.,married,basic.9y,no,yes,no,cellular,aug,tue,...,,,,nonexistent,,,,,,no
1,,technician,single,university.degree,no,yes,no,cellular,aug,tue,...,,,,nonexistent,,,,,,yes
2,,services,married,university.degree,no,no,no,cellular,aug,tue,...,,,,nonexistent,,,,,,no
3,,technician,single,professional.course,no,yes,no,cellular,aug,tue,...,,,,nonexistent,,,,,,yes
4,,admin.,married,university.degree,no,yes,no,cellular,aug,tue,...,,,,nonexistent,,,,,,no
5,,technician,single,high.school,no,yes,yes,cellular,aug,tue,...,,,,nonexistent,,,,,,yes
6,,technician,single,university.degree,no,no,no,cellular,aug,tue,...,,,,nonexistent,,,,,,no
7,,management,married,basic.9y,no,yes,no,cellular,aug,tue,...,,,,nonexistent,,,,,,no
8,,technician,single,university.degree,no,no,yes,cellular,aug,tue,...,,,,nonexistent,,,,,,yes
9,,technician,married,professional.course,no,no,no,cellular,aug,tue,...,,,,nonexistent,,,,,,no


## Learning something about the DataFrame
http://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html

In [33]:
df.registerTempTable('worldbank')

In [36]:
sqlContext.sql("""
SELECT y, COUNT(*) AS N
FROM worldbank
GROUP BY y
""").show()

+---+-----+
|  y|    N|
+---+-----+
| no|36548|
|yes| 4640|
+---+-----+



In [59]:
df.select(df.education).printSchema()

root
 |-- education: string (nullable = true)



In [56]:
agg_education_pdf = df.groupBy(['education', 'y']).agg({'y':'count'}).sort('education', 'y').toPandas()
agg_education_pdf.pivot_table(index='y', columns='education')

Unnamed: 0_level_0,count(y),count(y),count(y),count(y),count(y),count(y),count(y),count(y)
education,basic.4y,basic.6y,basic.9y,high.school,illiterate,professional.course,university.degree,unknown
y,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2
no,3748,2104,5572,8484,14,4648,10498,1480
yes,428,188,473,1031,4,595,1670,251


In [42]:
df.select('age', 'education', 'y').printSchema()

root
 |-- age: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- y: string (nullable = true)



## Creating Features

In [88]:
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

def prepareFeatures(df, label_col, featr_col):
    
    def gettypes(df):
        d = {}
        for t in df.dtypes:
            d[t[0]] = t[1]
        return d

    df_types = gettypes(df)

    ## process label
    if df_types[label_col]=='string':
        lbl_indexer = StringIndexer(inputCol=label_col, outputCol='label')
        df_tmp = lbl_indexer.fit(df).transform(df)
    else:
        ### assume is fine ...
        df_tmp = df.withColumn('label', F.col(label_col).cast('float'))
        
    fv_cols = []
    ## process categorical cols... there are strings
    featr_string_col = filter(lambda s: df_types[s]=='string', featr_col)
    
    for ftr in featr_string_col:
        print "Process feature '%s'"%str(ftr)
        indexer = StringIndexer(inputCol=ftr, outputCol=ftr+'_IDX')
        df_tmp = indexer.fit(df_tmp).transform(df_tmp)
        encoder = OneHotEncoder(dropLast=False, inputCol=ftr+'_IDX', outputCol=ftr+'_FV')
        df_tmp = encoder.transform(df_tmp)
        fv_cols += [ftr+'_FV']
    
    ## process other types...
    ## doing nothing here right now, but if: make sure to add names to `fv_cols`
        
    ## combine all feature vectors into one column `features`
    
    assembler_features = VectorAssembler(inputCols=fv_cols, outputCol='features')
    df_tmp = assembler_features.transform(df_tmp)
    return df_tmp

In [90]:
df2 = prepareFeatures(df, 'y', ['age', 'education'])
df2.printSchema()

Process feature 'education'
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp.var.rate: float (nullable = true)
 |-- cons.price.idx: float (nullable = true)
 |-- cons.conf.idx: float (nullable = true)
 |-- euribor3m: float (nullable = true)
 |-- nr.employed: integer (nullable = true)
 |-- y: string (nullable = true)
 |-- label: double (nullable = true)
 |-- education_IDX: double (nullable = true)
 |-- education_FV: vector (nullable = true)
 |-- fe

In [91]:
df2.select('y', 'age', 'education', 'label', 'features').limit(10).toPandas()

Unnamed: 0,y,age,education,label,features
0,no,,basic.4y,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)"
1,no,,high.school,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
2,no,,high.school,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
3,no,,basic.6y,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)"
4,no,,high.school,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
5,no,,basic.9y,0.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
6,no,,professional.course,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)"
7,no,,unknown,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)"
8,no,,professional.course,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)"
9,no,,high.school,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"


In [80]:
def gettypes(df):
    d = {}
    for t in df.dtypes:
        d[t[0]] = t[1]
    return d

In [81]:
gettypes(df)

{'age': 'int',
 'campaign': 'int',
 'cons.conf.idx': 'float',
 'cons.price.idx': 'float',
 'contact': 'string',
 'day_of_week': 'string',
 'default': 'string',
 'duration': 'int',
 'education': 'string',
 'emp.var.rate': 'float',
 'euribor3m': 'float',
 'housing': 'string',
 'job': 'string',
 'loan': 'string',
 'marital': 'string',
 'month': 'string',
 'nr.employed': 'int',
 'pdays': 'int',
 'poutcome': 'string',
 'previous': 'int',
 'y': 'string'}

In [92]:
#prepareFeatures(df, 'y', ['age', 'job', 'marital'])

In [98]:
df_proc = prepareFeatures(df, 'y', columns[:-1])
df_proc.printSchema()

Process feature 'job'
Process feature 'marital'
Process feature 'education'
Process feature 'default'
Process feature 'housing'
Process feature 'loan'
Process feature 'contact'
Process feature 'month'
Process feature 'day_of_week'
Process feature 'poutcome'
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp.var.rate: float (nullable = true)
 |-- cons.price.idx: float (nullable = true)
 |-- cons.conf.idx: float (nullable = true)
 |-- euribor3m: floa

In [99]:
numClasses = int(df_proc.agg(F.max('label')).collect()[0][0])+1
print numClasses

2


In [102]:
(trainingData, testData) = df_proc.randomSplit([0.7, 0.3])

from pyspark.ml.classification import RandomForestClassifier
##from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier( numTrees=100, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=10, maxBins=32)
model = rf.fit(trainingData)

result = model.transform(testData)
result.prediction

result.select('label', 'prediction').show()
#cm = res.groupBy('label').pivot('prediction').sum('one')
#

res = result.select(F.col('label').cast('int'), F.col('prediction').cast('int')).withColumn('one', F.lit(1))

cm = res.groupBy('label').pivot('prediction').sum('one')
cm.show()



# Evaluate model on test instances and compute test error
#predictions = model.predict(testData.map(lambda x: x.features))
#labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
#testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
#print('Test Error = ' + str(testErr))
#print('Learned classification forest model:')
#print(model.toDebugString())

# Save and load model
#model.save(sc, "target/tmp/myRandomForestClassificationModel")
#sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 20 rows

+-----+-----+---+
|label|    0|  1|
+-----+-----+---+
|    0|10728|118|
|    1| 1083|246|
+-----+-----+---+

