In [1]:
############################ Feature extraction
#TF-IDF (HashingTF and IDF)

In [2]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = sqlContext.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData.show(truncate=False)

In [3]:
# Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). A simple Tokenizer class provides this functionality. The example below shows how to split sentences into sequences of words.

# We split each sentence into words using Tokenizer. 
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData) ## this is a spark data frame, add one more column 'words'

words = wordsData.select('words').collect() ## select the words column, the type is list. 
wordsData.show(truncate=False)

# RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. 
from pyspark.ml.feature import Tokenizer, RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
w2 = regexTokenizer.transform(sentenceData)
w2.show(truncate=False)

In [4]:
# TF: HashingTF is a Transformer which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a “set of terms” might be a bag of words. The algorithm combines Term Frequency (TF) counts with the hashing trick for dimensionality reduction.

# we use HashingTF to hash the sentence into a feature vector.
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData) # this returns data frame. 

featurizedData.show(truncate=False)
raw = featurizedData.select('rawFeatures').collect() # this returns list. 
for i in raw:
  print i

In [5]:
# IDF: IDF is an Estimator which fits on a dataset and produces an IDFModel. The IDFModel takes feature vectors (generally created from HashingTF) and scales each column. Intuitively, it down-weights columns which appear frequently in a corpus.

# We use IDF to rescale the feature vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData) # type is: pyspark.ml.feature.IDFModel
rescaledData = idfModel.transform(featurizedData) # type: data frame
print type(rescaledData)
for features_label in rescaledData.select("features", "label").take(3):
    print(features_label)

In [6]:
####### VectorAssembler ########
# VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. 

from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = sqlContext.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")
output = assembler.transform(dataset)
#print type(output.select('features').collect())
output.show(truncate=False)