## One-hot encoding
In this notebook we play around with the one-hot encoding function provided in pyspark.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [2]:
# stop active context and start a new instance
sca = SparkContext._active_spark_context
if sca:
    sca.stop()
sc = SparkContext()

### Loading the dataset
We are using the 'Bike Buyer' dataset provided by Valentine Fontama in
his 'Buyer Propensity Model' experiment.
This can be found in the Cortana Intelligence Gallery.
[Buyer Propensity Model](http://gallery.cortanaintelligence.com/Experiment/Buyer-Propensity-Model-2)


In [3]:
# load the bike buyer dataset, skip header line
bbFileLocation = 'file:/datasets/BikeBuyerWithLocation.csv'
bbData = sc.textFile(bbFileLocation)
bbHeader = bbData.first()
bbData = bbData.filter(lambda x: x != bbHeader)
print "Data size is {}".format(bbData.count())
bbData.take(2)

Data size is 10000


[u'29476,Married,Female,20000,0,Partial College,Manual,No,1,0-1 Miles,Europe,47,Yes,31.42728,151.40989,Skegness,U8H 3NI,United Kingdom',
 u'29472,Married,Female,10000,1,High School,Manual,No,1,1-2 Miles,Europe,46,No,-49.777,-57.09436,Krefeld,38010,Germany']

### Preparing the dataset
There are some columns with missing values. That might cause problems in machine learning
algorithms. Therefore, we prepare the dataset for future use.

In [4]:
# Prepare the dataset
# Convert label to 0/1 (as double)
def convertYesNo(strlabel):
    lbl = 0.0
    if (strlabel == 'Yes'):
        lbl = 1.0
    return lbl

# Country not specified for regions North America / Pacific
# In these cases we use the region instead, since empty names are not allowed
def convertCountry(strCountry, strRegion):
    strCountryOut = strCountry
    if (strCountry == ""):
        strCountryOut = strRegion    
    return strCountryOut

# Parse data and create data frames
from pyspark.sql import Row

parts = bbData.map(lambda l: l.split(",", -1))
# select relevant columns for further processing
features = parts.map(lambda p: (convertYesNo(p[12]),p[1],p[2],p[3],p[4],p[5],p[6],
                                p[7], p[8], p[9], p[10],p[11],
                                convertCountry(p[17], p[10])))
featuresRows = features.map(lambda p: Row(label=p[0], MaritalStatus=p[1], Gender=p[2],
                                         YearlyIncome=p[3], Children=p[4], Education=p[5],
                                         Occupation=p[6], HomeOwner=p[7], Cars=p[8],
                                         CommuteDistance=p[9], Region=p[10], Age=p[11],
                                         Country=p[12]))

# sc is an existing spark context
sqlContext = SQLContext(sc)

# Register the DataFrame as a table
dfBB = sqlContext.createDataFrame(featuresRows)
dfBB.registerTempTable("bikebuyers")
dfBB.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Cars: string (nullable = true)
 |-- Children: string (nullable = true)
 |-- CommuteDistance: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HomeOwner: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- YearlyIncome: string (nullable = true)
 |-- label: double (nullable = true)



In [5]:
# Cast numeric features to double
dfBB = dfBB.withColumn("IntAgeTmp", dfBB.Age.cast('double')) \
            .drop("Age") \
            .withColumnRenamed("IntAgeTmp", "Age")
dfBB = dfBB.withColumn("IntCarsTmp", dfBB.Cars.cast('double')) \
            .drop("Cars") \
            .withColumnRenamed("IntCarsTmp", "Cars")
dfBB = dfBB.withColumn("IntChildrenTmp", dfBB.Children.cast('double')) \
            .drop("Children") \
            .withColumnRenamed("IntChildrenTmp", "Children")
dfBB = dfBB.withColumn("IntYearlyIncomeTmp", dfBB.YearlyIncome.cast('double')) \
            .drop("YearlyIncome") \
            .withColumnRenamed("IntYearlyIncomeTmp", "YearlyIncome")  

With pandas we can show the first five records in a pretty table format.

In [6]:
import pandas as pd
pd.DataFrame(dfBB.take(5),columns=dfBB.columns).transpose()

Unnamed: 0,0,1,2,3,4
CommuteDistance,0-1 Miles,1-2 Miles,2-5 Miles,1-2 Miles,2-5 Miles
Country,United Kingdom,Germany,United Kingdom,France,United Kingdom
Education,Partial College,High School,High School,High School,Partial College
Gender,Female,Female,Female,Male,Male
HomeOwner,No,No,No,No,No
MaritalStatus,Married,Married,Single,Single,Single
Occupation,Manual,Manual,Manual,Manual,Manual
Region,Europe,Europe,Europe,Europe,Europe
label,1,0,0,0,0
Age,47,46,46,46,64


### One-hot encoding
One-hot encoding is used to convert categorical variables into numerical ones.
Let's convert all categorical features to numerical.

In [7]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
MaritalStatusIndexer = StringIndexer(inputCol="MaritalStatus", outputCol="indMaritalStatus")
MaritalStatusEncoder = OneHotEncoder(inputCol="indMaritalStatus", outputCol="vecMaritalStatus")
GenderIndexer = StringIndexer(inputCol="Gender", outputCol="indGender")
GenderEncoder = OneHotEncoder(inputCol="indGender", outputCol="vecGender")
HomeOwnerIndexer = StringIndexer(inputCol="HomeOwner", outputCol="indHomeOwner")
HomeOwnerEncoder = OneHotEncoder(inputCol="indHomeOwner", outputCol="vecHomeOwner")
EducationIndexer = StringIndexer(inputCol="Education", outputCol="indEducation")
EducationEncoder = OneHotEncoder(inputCol="indEducation", outputCol="vecEducation")
OccupationIndexer = StringIndexer(inputCol="Occupation", outputCol="indOccupation")
OccupationEncoder = OneHotEncoder(inputCol="indOccupation", outputCol="vecOccupation")
CommuteDistanceIndexer = StringIndexer(inputCol="CommuteDistance", 
                                       outputCol="indCommuteDistance")
CommuteDistanceEncoder = OneHotEncoder(inputCol="indCommuteDistance", 
                                       outputCol="vecCommuteDistance")
RegionIndexer = StringIndexer(inputCol="Region", outputCol="indRegion")
RegionEncoder = OneHotEncoder(inputCol="indRegion", outputCol="vecRegion")
CountryIndexer = StringIndexer(inputCol="Country", outputCol="indCountry")
CountryEncoder = OneHotEncoder(inputCol="indCountry", outputCol="vecCountry")


#### Setting up a pipeline for one-hot encoding

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
featureAssembler = VectorAssembler(inputCols=["Age","Children","Cars","YearlyIncome",
                                            "vecMaritalStatus","vecGender",
                                            "vecHomeOwner", "vecEducation",
                                            "vecOccupation","vecCommuteDistance", "vecRegion",
                                            "vecCountry"],
                                  outputCol="features")
pipelineOHE = Pipeline(stages=[MaritalStatusIndexer, MaritalStatusEncoder, 
                               GenderIndexer, GenderEncoder, 
                               HomeOwnerIndexer, HomeOwnerEncoder,
                               EducationIndexer, EducationEncoder,
                               OccupationIndexer, OccupationEncoder,
                               CommuteDistanceIndexer, CommuteDistanceEncoder,
                               RegionIndexer, RegionEncoder, CountryIndexer, CountryEncoder,
                                featureAssembler])
modelOHE = pipelineOHE.fit(dfBB)
featuresOHE = modelOHE.transform(dfBB).select("features")

In [9]:
# show first record in dfBB
print "First record in the dataset"
print dfBB.first()

First record in the dataset
Row(CommuteDistance=u'0-1 Miles', Country=u'United Kingdom', Education=u'Partial College', Gender=u'Female', HomeOwner=u'No', MaritalStatus=u'Married', Occupation=u'Manual', Region=u'Europe', label=1.0, Age=47.0, Cars=1.0, Children=0.0, YearlyIncome=20000.0)


In [10]:
# show encoded features of first record in dfBB
print "Corresponding one-hot encoded features"
print featuresOHE.first()

Corresponding one-hot encoded features
Row(features=SparseVector(25, {0: 47.0, 2: 1.0, 3: 20000.0, 4: 1.0, 7: 1.0, 15: 1.0, 20: 1.0, 23: 1.0}))


As we can see above, the features are encoded in a sparse vector.
Only indices with non initial values are specified with their corresponding values.
For example, index 1 corresponds to the column 'Children'. If Children = 0, then index 1 can
be ommited.

But what is the meaning of 15: 1.0 in the output above?

Let's have a closer look at how the encoding works.

In [11]:
# How does OHE work?
# Column Country (five distinct values in the dataset)
modelCI = CountryIndexer.fit(dfBB)
indexedCountry = modelCI.transform(dfBB)
encodedCountry = CountryEncoder.transform(indexedCountry)
for e in encodedCountry.select("Country", "vecCountry").take(5):
    print e
    print e.vecCountry
# column Gender (two distinct values)
modelGI = GenderIndexer.fit(dfBB)
indexedGender = modelGI.transform(dfBB)
encodedGender = GenderEncoder.transform(indexedGender)
for e in encodedGender.select("Gender", "vecGender").take(5):
    print e
    print e.vecGender

Row(Country=u'United Kingdom', vecCountry=SparseVector(4, {2: 1.0}))
(4,[2],[1.0])
Row(Country=u'Germany', vecCountry=SparseVector(4, {}))
(4,[],[])
Row(Country=u'United Kingdom', vecCountry=SparseVector(4, {2: 1.0}))
(4,[2],[1.0])
Row(Country=u'France', vecCountry=SparseVector(4, {3: 1.0}))
(4,[3],[1.0])
Row(Country=u'United Kingdom', vecCountry=SparseVector(4, {2: 1.0}))
(4,[2],[1.0])
Row(Gender=u'Female', vecGender=SparseVector(1, {}))
(1,[],[])
Row(Gender=u'Female', vecGender=SparseVector(1, {}))
(1,[],[])
Row(Gender=u'Female', vecGender=SparseVector(1, {}))
(1,[],[])
Row(Gender=u'Male', vecGender=SparseVector(1, {0: 1.0}))
(1,[0],[1.0])
Row(Gender=u'Male', vecGender=SparseVector(1, {0: 1.0}))
(1,[0],[1.0])


Each value is assigned to a sparse vector. For each category, there is a default value.
For example, Gender=Female is mapped to SparseVector(1, {}) and Country=Germany is mapped
to SparseVector(4, {}).
The corresponding dense vectors are

Country           | Dense vector | Sparse vector
------------------|--------------|--------------
United Kingdom    | 0010         | {2: 1.0}
France            | 0001         | {3: 1.0}
Germany           | 0000         | {}

The distinct values in each category are ordered relative to their frequencies in the dataset.

In [12]:
# Show all distinct countries in the dataset with the corresponding indices
countryDistinct = indexedCountry.select("Country", "indCountry") \
                .map(lambda x: (str(x.indCountry) + "<->" + x.Country, 1)) \
                .groupByKey() \
                .map(lambda (k,v): (k, sum(v))) \
                .collect()
print countryDistinct

[(u'1.0<->Pacific', 1616), (u'0.0<->North America', 5456), (u'3.0<->France', 984), (u'2.0<->United Kingdom', 991), (u'4.0<->Germany', 953)]


In [13]:
# Show number of occurrences for gender in the test dataframe
genderDistinct = indexedGender.select("Gender", "indGender") \
                .map(lambda x: (str(x.indGender) + "<->" + x.Gender, 1)) \
                .groupByKey() \
                .map(lambda (k,v): (k, sum(v))) \
                .collect()
print genderDistinct

[(u'1.0<->Female', 4887), (u'0.0<->Male', 5113)]


In [14]:
# Regions ...
modelRI = RegionIndexer.fit(dfBB)
indexedRegion = modelRI.transform(dfBB)
regionDistinct = indexedRegion.select("Region", "indRegion") \
                .map(lambda x: (str(x.indRegion) + "<->" + x.Region, 1)) \
                .groupByKey() \
                .map(lambda (k,v): (k, sum(v))) \
                .collect()
print regionDistinct

[(u'0.0<->North America', 5456), (u'1.0<->Europe', 2928), (u'2.0<->Pacific', 1616)]


In [15]:
# Education 
modelEI = EducationIndexer.fit(dfBB)
indexedEducation = modelEI.transform(dfBB)
educationDistinct = indexedEducation.select("Education", "indEducation") \
                .map(lambda x: (str(x.indEducation) + "<->" + x.Education, 1)) \
                .groupByKey() \
                .map(lambda (k,v): (k, sum(v))) \
                .collect()
print educationDistinct

[(u'0.0<->Partial College', 2698), (u'3.0<->Graduate Degree', 1635), (u'2.0<->High School', 1979), (u'1.0<->Bachelors', 2578), (u'4.0<->Partial High School', 1110)]


In [16]:
# Occupation
modelOI = OccupationIndexer.fit(dfBB)
indexedOccupation = modelOI.transform(dfBB)
occupationDistinct = indexedOccupation.select("Occupation", "indOccupation") \
                .map(lambda x: (str(x.indOccupation) + "<->" + x.Occupation, 1)) \
                .groupByKey() \
                .map(lambda (k,v): (k, sum(v))) \
                .collect()
print occupationDistinct

[(u'4.0<->Manual', 1399), (u'3.0<->Clerical', 1423), (u'2.0<->Management', 1730), (u'1.0<->Skilled Manual', 2495), (u'0.0<->Professional', 2953)]


In [17]:
# Commute distances
modelCDI = CommuteDistanceIndexer.fit(dfBB)
indexedCommuteDistance = modelCDI.transform(dfBB)
CommuteDistanceDistinct = \
                indexedCommuteDistance.select("CommuteDistance", "indCommuteDistance") \
                .map(lambda x: (str(x.indCommuteDistance) + "<->" + x.CommuteDistance, 1)) \
                .groupByKey() \
                .map(lambda (k,v): (k, sum(v))) \
                .collect()
print CommuteDistanceDistinct

[(u'4.0<->2-5 Miles', 1520), (u'0.0<->0-1 Miles', 3081), (u'3.0<->10+ Miles', 1630), (u'2.0<->1-2 Miles', 1810), (u'1.0<->5-10 Miles', 1959)]


In [18]:
# Let us inspect the encoded features for the first ten records.
for x in featuresOHE.take(10):
    print(x.features)

(25,[0,2,3,4,7,15,20,23],[47.0,1.0,20000.0,1.0,1.0,1.0,1.0,1.0])
(25,[0,1,2,3,4,9,17,20],[46.0,1.0,1.0,10000.0,1.0,1.0,1.0,1.0])
(25,[0,1,2,3,9,20,23],[46.0,1.0,1.0,10000.0,1.0,1.0,1.0])
(25,[0,1,2,3,5,9,17,20,24],[46.0,1.0,1.0,10000.0,1.0,1.0,1.0,1.0,1.0])
(25,[0,2,3,5,7,20,23],[64.0,1.0,10000.0,1.0,1.0,1.0,1.0])
(25,[0,2,3,8,14,20,23],[68.0,1.0,10000.0,1.0,1.0,1.0,1.0])
(25,[0,3,10,14,15,20],[69.0,20000.0,1.0,1.0,1.0,1.0])
(25,[0,2,3,6,8,14,15,20,23],[69.0,1.0,10000.0,1.0,1.0,1.0,1.0,1.0,1.0])
(25,[0,3,4,6,8,14,15,20,24],[50.0,20000.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])
(25,[0,3,5,6,7,15,20,24],[50.0,10000.0,1.0,1.0,1.0,1.0,1.0,1.0])


### Closing remarks - decoding
The feature vector can now be decoded.

Index No. | Meaning
----------|---------------------------------------------------------
0         | Age
1         | Children
2         | Cars
3         | YearlyIncome
4         | MaritalStatus_Married   (1: Married, 0: Single)
5         | Gender_Male             
6         | HomeOwner_Yes
7         | Education_Partial_College
8         | Education_Bachelor
9         | Education_High_School
10        | Education_Graduate_Degree
11        | Occupation_Professional
12        | Occupation_Skilled_Manual
13        | Occupation_Management
14        | Occupation_Clerical
15        | CommuteDistance_0_1_Miles
16        | CommuteDistance_5_10_Miles
17        | CommuteDistance_1_2_Miles
18        | CommuteDistance_10+Miles
19        | Region_North_America
20        | Region_Europe
21        | Country_North_America
22        | Country_Pacific
23        | Country_United_Kingdom
24        | Country_France

This information is needed for example when we want to interpret the
coefficients of a logistic regression model.