Input dataset: mushroom-classification from Kaggle competition https://www.kaggle.com/uciml/mushroom-classification

The goal is to practice Pyspark here. The rest part will be worked carefully elsewhere. 

In [1]:
! head -5 mushrooms.csv

class,cap-shape,cap-surface,cap-color,bruises,odor,gill-attachment,gill-spacing,gill-size,gill-color,stalk-shape,stalk-root,stalk-surface-above-ring,stalk-surface-below-ring,stalk-color-above-ring,stalk-color-below-ring,veil-type,veil-color,ring-number,ring-type,spore-print-color,population,habitat
p,x,s,n,t,p,f,c,n,k,e,e,s,s,w,w,p,w,o,p,k,s,u
e,x,s,y,t,a,f,c,b,k,e,c,s,s,w,w,p,w,o,p,n,n,g
e,b,s,w,t,l,f,c,b,n,e,c,s,s,w,w,p,w,o,p,n,n,m
p,x,y,w,t,p,f,c,n,n,e,e,s,s,w,w,p,w,o,p,k,s,u


In [1]:
sc

<pyspark.context.SparkContext at 0x101c9d5d0>

### Import Packages

In [2]:
import pandas as pd
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Read Data

If the size of the dataset is not too large, you can use pandas read csv first and transfer it to spart dataframe. However, in most of the time if we need to use Spark, you have to define the schema first then read the data directly through Spark.

In [19]:
df_pd = pd.read_csv('mushrooms.csv') 
df_sp = spark.createDataFrame(df_pd) # sc, sqlconverter

In [21]:
df_sp1 = spark.read.csv('mushrooms.csv',header=True)

In [4]:
df_pd.head()

Unnamed: 0,class,cap-shape,cap-surface,cap-color,bruises,odor,gill-attachment,gill-spacing,gill-size,gill-color,...,stalk-surface-below-ring,stalk-color-above-ring,stalk-color-below-ring,veil-type,veil-color,ring-number,ring-type,spore-print-color,population,habitat
0,p,x,s,n,t,p,f,c,n,k,...,s,w,w,p,w,o,p,k,s,u
1,e,x,s,y,t,a,f,c,b,k,...,s,w,w,p,w,o,p,n,n,g
2,e,b,s,w,t,l,f,c,b,n,...,s,w,w,p,w,o,p,n,n,m
3,p,x,y,w,t,p,f,c,n,n,...,s,w,w,p,w,o,p,k,s,u
4,e,x,s,g,f,n,f,w,b,k,...,s,w,w,p,w,o,e,n,a,g


In [5]:
df_pd.shape

(8124, 23)

In [6]:
# Clean data
# Need to transfer categorial features into dummy
df_pd.isnull().sum()

class                       0
cap-shape                   0
cap-surface                 0
cap-color                   0
bruises                     0
odor                        0
gill-attachment             0
gill-spacing                0
gill-size                   0
gill-color                  0
stalk-shape                 0
stalk-root                  0
stalk-surface-above-ring    0
stalk-surface-below-ring    0
stalk-color-above-ring      0
stalk-color-below-ring      0
veil-type                   0
veil-color                  0
ring-number                 0
ring-type                   0
spore-print-color           0
population                  0
habitat                     0
dtype: int64

In [7]:
df_sp.select('*').filter("class is null").count()

0

In [8]:
# Show value counts of a column
# The clause is sql-like
# Select class from table where/join/..... group by/ order by/limit
a = df_sp.select('class').groupBy('class').count()
a.show()

+-----+-----+
|class|count|
+-----+-----+
|    e| 4208|
|    p| 3916|
+-----+-----+



### Create label column for prediction (Label Encoding Example)

In [23]:
# stringindexer == labelencoder
stringIndexer = StringIndexer(inputCol="class", outputCol="label")
df_sp = stringIndexer.fit(df_sp).transform(df_sp)
df_sp.select('class','label').show(10)

+-----+-----+
|class|label|
+-----+-----+
|    p|  1.0|
|    e|  0.0|
|    e|  0.0|
|    p|  1.0|
|    e|  0.0|
|    e|  0.0|
|    e|  0.0|
|    e|  0.0|
|    p|  1.0|
|    e|  0.0|
+-----+-----+
only showing top 10 rows



### OneHotEncoder Example

In [25]:
# use onehotencoder to process feature, but still need to transfer string into index first
stringIndexer = StringIndexer(inputCol='cap-shape', outputCol='indexed_cap-shape')
df_sp = stringIndexer.fit(df_sp).transform(df_sp)

encoder = OneHotEncoder(inputCol='indexed_cap-shape', outputCol='feature_cap-shape')
df_sp = encoder.transform(df_sp)

df_sp.select('cap-shape','indexed_cap-shape','feature_cap-shape').show()


+---------+-----------------+-----------------+
|cap-shape|indexed_cap-shape|feature_cap-shape|
+---------+-----------------+-----------------+
|        x|              0.0|    (5,[0],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        b|              3.0|    (5,[3],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        b|              3.0|    (5,[3],[1.0])|
|        b|              3.0|    (5,[3],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        b|              3.0|    (5,[3],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        b|              3.0|    (5,[3],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        x|              0.0|    (5,[0],[1.0])|
|        s|              4.0|    (5,[4],[1.0])|
|        f|              1.0|    (5,[1],[1.0])|
|        x|              0.0|    (5,[0],

After OneHotEncoder, the output will be one column (traditionally there will be several columns). 

e.g. (5,[0],[1.0])
In this feature there are 5 unique values, this row the value is 0.


In [26]:
df_sp.show(1)

+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+-----+-----------------+-----------------+
|class|cap-shape|cap-surface|cap-color|bruises|odor|gill-attachment|gill-spacing|gill-size|gill-color|stalk-shape|stalk-root|stalk-surface-above-ring|stalk-surface-below-ring|stalk-color-above-ring|stalk-color-below-ring|veil-type|veil-color|ring-number|ring-type|spore-print-color|population|habitat|label|indexed_cap-shape|feature_cap-shape|
+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+-----+-----

In [27]:
for header in df_sp.columns[2:-3]:
    print header, df_sp.select(header).distinct().count()
    if df_sp.select(header).distinct().count() > 1:
        indexed_header_name = 'indexed_' + header

        stringIndexer = StringIndexer(inputCol=header, outputCol=indexed_header_name)
        df_sp = stringIndexer.fit(df_sp).transform(df_sp)

        encoder = OneHotEncoder(inputCol=indexed_header_name, outputCol='feature_'+header)
        df_sp = encoder.transform(df_sp)

cap-surface 4
cap-color 10
bruises 2
odor 9
gill-attachment 2
gill-spacing 2
gill-size 2
gill-color 12
stalk-shape 2
stalk-root 5
stalk-surface-above-ring 4
stalk-surface-below-ring 4
stalk-color-above-ring 9
stalk-color-below-ring 9
veil-type 1
veil-color 4
ring-number 3
ring-type 5
spore-print-color 9
population 6
habitat 7


In [28]:
df_sp.show(1)

+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+-----+-----------------+-----------------+-------------------+-------------------+-----------------+-----------------+---------------+---------------+------------+-------------+-----------------------+-----------------------+--------------------+--------------------+-----------------+-----------------+------------------+------------------+-------------------+-------------------+------------------+------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------+-----

### Generate the Vector

In [14]:
lst_features = [i for i in df_sp.columns if i.startswith('feature_')]
vecAssembler = VectorAssembler(inputCols=lst_features, outputCol="features")
df_sp = vecAssembler.transform(df_sp)
df_sp.select('features').take(5)

[Row(features=SparseVector(95, {0: 1.0, 6: 1.0, 8: 1.0, 24: 1.0, 26: 1.0, 27: 1.0, 36: 1.0, 43: 1.0, 45: 1.0, 48: 1.0, 51: 1.0, 59: 1.0, 67: 1.0, 70: 1.0, 72: 1.0, 78: 1.0, 86: 1.0, 93: 1.0})),
 Row(features=SparseVector(95, {0: 1.0, 6: 1.0, 11: 1.0, 22: 1.0, 26: 1.0, 27: 1.0, 28: 1.0, 36: 1.0, 44: 1.0, 45: 1.0, 48: 1.0, 51: 1.0, 59: 1.0, 67: 1.0, 70: 1.0, 72: 1.0, 77: 1.0, 87: 1.0, 90: 1.0})),
 Row(features=SparseVector(95, {3: 1.0, 6: 1.0, 12: 1.0, 23: 1.0, 26: 1.0, 27: 1.0, 28: 1.0, 32: 1.0, 44: 1.0, 45: 1.0, 48: 1.0, 51: 1.0, 59: 1.0, 67: 1.0, 70: 1.0, 72: 1.0, 77: 1.0, 87: 1.0, 94: 1.0})),
 Row(features=SparseVector(95, {0: 1.0, 5: 1.0, 12: 1.0, 24: 1.0, 26: 1.0, 27: 1.0, 32: 1.0, 43: 1.0, 45: 1.0, 48: 1.0, 51: 1.0, 59: 1.0, 67: 1.0, 70: 1.0, 72: 1.0, 78: 1.0, 86: 1.0, 93: 1.0})),
 Row(features=SparseVector(95, {0: 1.0, 6: 1.0, 9: 1.0, 17: 1.0, 18: 1.0, 26: 1.0, 28: 1.0, 36: 1.0, 40: 1.0, 43: 1.0, 45: 1.0, 48: 1.0, 51: 1.0, 59: 1.0, 67: 1.0, 70: 1.0, 73: 1.0, 77: 1.0, 88: 1.0, 90:

In [15]:
print lst_features

['feature_cap-shape', 'feature_cap-surface', 'feature_cap-color', 'feature_bruises', 'feature_odor', 'feature_gill-attachment', 'feature_gill-spacing', 'feature_gill-size', 'feature_gill-color', 'feature_stalk-shape', 'feature_stalk-root', 'feature_stalk-surface-above-ring', 'feature_stalk-surface-below-ring', 'feature_stalk-color-above-ring', 'feature_stalk-color-below-ring', 'feature_veil-color', 'feature_ring-number', 'feature_ring-type', 'feature_spore-print-color', 'feature_population', 'feature_habitat']


### Model  Training

In [17]:
dt = DecisionTreeClassifier(maxDepth=4, labelCol="label")
for test in range(3):
    df_train, df_test = df_sp.randomSplit([.8, .2])

    model = dt.fit(df_train)
    df_predicted = model.transform(df_test.select('features','label'))
    # y_predicted
    
    evaluator = MulticlassClassificationEvaluator()
    # metrics
    
    print evaluator.evaluate(df_predicted)
    df_predicted.select('prediction').groupBy('prediction').count().show()


0.996930004934
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  871|
|       1.0|  758|
+----------+-----+

0.996886499886
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  820|
|       1.0|  786|
+----------+-----+

0.997569360388
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  895|
|       1.0|  751|
+----------+-----+

