# Topic Modeling with Pyspark

   - **Supported by TingLin**
   - **Kansas State University**

# Table of Contents
  - **Extracting, transforming and selecting features**
    
- **Feature Extractors**
     - [TF-IDF](#TF-IDF)
     - [Word2Vec](#Word2Vec)
     - [CountVectorizer](#CountVectorizer)
- **Feature Transformers**
     - [Tokenizer](#Tokenizer)
     - [StopWordsRemover](#StopWordsRemover)
     - [nn-gram](#nn-gram)
     - [Binarizer](#Binarizer)
     - [PCA](#PCA)
     - [PolynomialExpansion](#PolynomialExpansion)
     - [Discrete Cosine Transform (DCT)](#Discrete Cosine Transform)
     - [StringIndexer](#StringIndexer)
     - [IndexToString](#IndexToString)
     - [OneHotEncoder](#OneHotEncoder)
     - [VectorIndexer](#VectorIndexer)
     - [Interaction](#Interaction)
     - [Normalizer](#Normalizer)
     - [StandardScaler](#StandardScaler)
     - [MinMaxScaler](#MinMaxScaler)
     - [MaxAbsScaler](#MaxAbsScaler)
     - [Bucketizer](#Bucketizer)
     - [ElementwiseProduct](#ElementwiseProduct)
     - [SQLTransformer](#SQLTransformer)
     - [VectorAssembler](#VectorAssembler)
     - [QuantileDiscretizer](#QuantileDiscretizer)
- **Feature Selectors**
     - [VectorSlicer](#VectorSlicer)
     - [RFormula](#RFormula)
     - [ChiSqSelector](#ChiSqSelector)
- **Clustering**
     - [LDA](#LDA)
- **LDA Topic Modeling with csv file**
     - [LDA Topic Modeling with csv file](#LDA Topic Modeling with csv file)
- **Visualization**
     - [Visualization](#Visualization)


# 1. What am I going to learn from this Pyspark Tutorial

- Apache Spark comes with an interactive shell for python as it does for Scala. The shell for python is known as “PySpark”. To use PySpark you will have to have python installed on your machine. As we know that each Linux machine comes preinstalled with python so you need not worry about python installation. To get started in a standalone mode you can download the pre-built version of spark from its official home page listed in the pre-requisites section of the PySpark tutorial.
- Decompress the downloaded file. On decompressing the spark downloadable, you will see the following structure:
    - bin: Holds all the binaries
    - conf: Holds all the necessary configuration files to run any spark application
    - ec2: Holds the scripts to launch a cluster on amazon cloud space with multiple ec2 instances
    - lib: Holds the prebuilt libraries which make up the spark APIS.
- Licenses:
    - python
    - python API
- README.md:
    - Holds important instructions to get started with spark
    - Holds important startup scripts that are required to setup distributed clustter
- CHANGES.txt:
    - Holds all the changes information for each version of apache spark
- data:
    - Holds data that is used in the examples
- examples:
    - Has examples which are a good place to learn the usage of spark functions
- LICENSE NOTICE:
    - Important information
    - R:
    - Holds API of R language
- RELEASE:
    - Holds make info of the downloaded version

- Hadoop enables distributed, low-cost storage for this growing amount of unstructured data. In this post, I'll show one way to analyze unstructured data using Apache Spark. Spark is advantageous for text analytics because it provides a platform for scalable, distributed computing.

- When it comes to text analytics, you have a few option for analyzing text. I like to categorize these techniques like this:

    - Text Mining (i.e. Text clustering, data-driven topics)
    - Categorization (i.e. Tagging unstructured data into categories and sub-categories; hierarchies; taxonomies)
    - Entity Extraction (i.e. Extracting patterns such as phrases, addresses, product codes, phone numbers, etc.)
    - Sentiment Analysis (i.e. Tagging positive, negative, or neutral with varying levels of sentiment)
    - Deep Linguistics (i.e Semantics. Understanding causality, purpose, time, etc.)

In [1]:
from pyspark.sql import SQLContext, Row
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.ml.feature import StopWordsRemover

In [2]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)


# 2.Extracting, transforming and selecting features

- ** 2.1 Feature Extractors**
<a id = 'Feature Extractors'></a>
   - ** 2.1.1 TF-IDF**
    <a id = 'TF-IDF'></a>

In [32]:

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = sqlContext.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[0,5,9,17],[0...|
|  0.0|(20,[2,7,9,13,15]...|
|  1.0|(20,[4,6,13,15,18...|
+-----+--------------------+



- Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus. Denote a term by tt, a document by dd, and the corpus by DD. Term frequency TF(t,d)TF(t,d) is the number of times that term tt appears in document dd, while document frequency DF(t,D)DF(t,D) is the number of documents that contains term tt. If we only use term frequency to measure the importance, it is very easy to over-emphasize terms that appear very often but carry little information about the document, e.g. “a”, “the”, and “of”. If a term appears very often across the corpus, it means it doesn’t carry special information about a particular document. Inverse document frequency is a numerical measure of how much information a term provides:
    - IDF(t,D)=log|D|+1DF(t,D)+1,
    - IDF(t,D)=log⁡|D|+1DF(t,D)+1,
- where |D|is the total number of documents in the corpus. Since logarithm is used, if a term appears in all documents, its IDF value becomes 0. Note that a smoothing term is applied to avoid dividing by zero for terms outside the corpus. The TF-IDF measure is simply the product of TF and IDF:
    - TFIDF(t,d,D)=TF(t,d)⋅IDF(t,D).
    - TFIDF(t,d,D)=TF(t,d)⋅IDF(t,D).
- There are several variants on the definition of term frequency and document frequency. In MLlib, we separate TF and IDF to make them flexible.

- TF: Both HashingTF and CountVectorizer can be used to generate the term frequency vectors.

    - HashingTF is a Transformer which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a “set of terms” might be a bag of words. HashingTF utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. The hash function used here is MurmurHash 3. Then term frequencies are calculated based on the mapped indices. This approach avoids the need to compute a global term-to-index map, which can be expensive for a large corpus, but it suffers from potential hash collisions, where different raw features may become the same term after hashing. To reduce the chance of collision, we can increase the target feature dimension, i.e. the number of buckets of the hash table. Since a simple modulo is used to transform the hash function to a column index, it is advisable to use a power of two as the feature dimension, otherwise the features will not be mapped evenly to the columns. The default feature dimension is 218=262,144218=262,144. An optional binary toggle parameter controls term frequency counts. When set to true all nonzero frequency counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts.

    - CountVectorizer converts text documents to vectors of term counts. Refer to CountVectorizer for more details.

IDF: IDF is an Estimator which is fit on a dataset and produces an IDFModel. The IDFModel takes feature vectors (generally created from HashingTF or CountVectorizer) and scales each column. Intuitively, it down-weights columns which appear frequently in a corpus.


   - ** 2.1.2 Word2Vec**
    <a id = 'Word2Vec'></a>

In [33]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = sqlContext.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.00754214553162,-0.0373112341389,0.0177642568946]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [-0.0172512845269,0.0307334170876,0.0469989763972]

Text: [Logistic, regression, models, are, neat] => 
Vector: [0.101050335169,-0.0430820055306,0.00582689233124]



- Word2Vec is an Estimator which takes sequences of words representing documents and trains a Word2VecModel. The model maps each word to a unique fixed-size vector. The Word2VecModel transforms each document into a vector using the average of all words in the document; this vector can then be used as features for prediction, document similarity calculations, etc. 

  - ** 2.1.3 CountVectorizer**
    <a id = 'CountVectorizer'></a>

In [34]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = sqlContext.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



- CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.

- During the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus. An optional parameter minDF also affects the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary. Another optional binary toggle parameter controls the output vector. If set to true all nonzero counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts.

- **2.2 Feature Transformers**
<a id = 'Feature Transformers'></a>
   - ** 2.2.1 Tokenizer**
    <a id = 'Tokenizer'></a> 

In [8]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = sqlContext.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case cla

- Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). A simple Tokenizer class provides this functionality. The example below shows how to split sentences into sequences of words.

- RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. By default, the parameter “pattern” (regex, default: "\\s+") is used as delimiters to split the input text. Alternatively, users can set parameter “gaps” to false indicating the regex “pattern” denotes “tokens” rather than splitting gaps, and find all matching occurrences as the tokenization result.

   - **2.2.2 Stopwordsremover**
    <a id = 'Stopwordsremover'></a>

In [9]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = sqlContext.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



- StopWordsRemover takes as input a sequence of strings (e.g. the output of a Tokenizer) and drops all the stop words from the input sequences. The list of stopwords is specified by the stopWords parameter. Default stop words for some languages are accessible by calling StopWordsRemover.loadDefaultStopWords(language), for which available options are “danish”, “dutch”, “english”, “finnish”, “french”, “german”, “hungarian”, “italian”, “norwegian”, “portuguese”, “russian”, “spanish”, “swedish” and “turkish”. A boolean parameter caseSensitive indicates if the matches should be case sensitive (false by default).

- **2.2.3  nn-gram**
<a id = 'nn-gram'></a>

In [39]:

from pyspark.ml.feature import NGram

wordDataFrame = sqlContext.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



- An n-gram is a sequence of nn tokens (typically words) for some integer nn. The NGram class can be used to transform input features into nn-grams.

- NGram takes as input a sequence of strings (e.g. the output of a Tokenizer). The parameter n is used to determine the number of terms in each nn-gram. The output will consist of a sequence of nn-grams where each nn-gram is represented by a space-delimited string of n consecutive words. If the input sequence contains fewer than n strings, no output is produced.

- ** 2.2.4 Binarizer**
<a id = 'Binarizer'></a>

In [40]:

from pyspark.ml.feature import Binarizer

continuousDataFrame = sqlContext.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()

Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+



- Binarization is the process of thresholding numerical features to binary (0/1) features.

- Binarizer takes the common parameters inputCol and outputCol, as well as the threshold for binarization. Feature values greater than the threshold are binarized to 1.0; values equal to or less than the threshold are binarized to 0.0. Both Vector and Double types are supported for inputCol.

- ** 2.2.5 PCA**
<a id = 'PCA'></a>

In [41]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



- PCA is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables into a set of values of linearly uncorrelated variables called principal components. A PCA class trains a model to project vectors to a low-dimensional space using PCA. The example below shows how to project 5-dimensional feature vectors into 3-dimensional principal components.

- ** 2.2.6 PolynomialExpansion**
<a id = 'PolynomialExpansion'></a>

In [11]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = sqlContext.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)

+----------+------------------------------------------+
|features  |polyFeatures                              |
+----------+------------------------------------------+
|[2.0,1.0] |[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0]     |
|[0.0,0.0] |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]     |
|[3.0,-1.0]|[3.0,9.0,27.0,-1.0,-3.0,-9.0,1.0,3.0,-1.0]|
+----------+------------------------------------------+



- Polynomial expansion is the process of expanding your features into a polynomial space, which is formulated by an n-degree combination of original dimensions. A PolynomialExpansion class provides this functionality. The example below shows how to expand your features into a 3-degree polynomial space.



- **2.2.7 Discrete Cosine Transform (DCT)**
<a id = 'Discrete Cosine Transform'></a>

In [12]:
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = sqlContext.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)

+----------------------------------------------------------------+
|featuresDCT                                                     |
+----------------------------------------------------------------+
|[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
|[-1.0,3.378492794482933,-7.000000000000001,2.9301512653149677]  |
|[4.0,9.304453421915744,11.000000000000002,1.5579302036357163]   |
+----------------------------------------------------------------+



- The Discrete Cosine Transform transforms a length NN real-valued sequence in the time domain into another length NN real-valued sequence in the frequency domain. A DCT class provides this functionality, implementing the DCT-II and scaling the result by 1/2‾√1/2 such that the representing matrix for the transform is unitary. No shift is applied to the transformed sequence (e.g. the 00th element of the transformed sequence is the 00th DCT coefficient and not the N/2N/2th).



- ** 2.2.8 StringIndexer**
<a id = 'StringIndexer'></a>

In [13]:
from pyspark.ml.feature import StringIndexer

df = sqlContext.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+



- StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0. The unseen labels will be put at index numLabels if user chooses to keep them. If the input column is numeric, we cast it to string and index the string values. When downstream pipeline components such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column name. In many cases, you can set the input column with setInputCol.



- ** 2.2.9 IndexToString**
<a id = 'IndexToString'></a>

In [15]:
from pyspark.ml.feature import IndexToString, StringIndexer

df = sqlContext.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()

Transformed string column 'category' to indexed column 'categoryIndex'
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

StringIndexer will store labels in output column metadata

Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata
+---+-------------+----------------+
| id|categoryIndex|originalCategory|
+---+-------------+----------------+
|  0|          0.0|               a|
|  1|          2.0|               b|
|  2|          1.0|               c|
|  3|          0.0|               a|
|  4|          0.0|               a|
|  5|          1.0|               c|
+---+-------------+----------------+



- Symmetrically to StringIndexer, IndexToString maps a column of label indices back to a column containing the original labels as strings. A common use case is to produce indices from labels with StringIndexer, train a model with those indices and retrieve the original labels from the column of predicted indices with IndexToString. However, you are free to supply your own labels.

- ** 2.2.10 OneHotEncoder**
<a id = 'OneHotEncoder'></a>

In [16]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = sqlContext.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

+---+--------+-------------+-------------+
| id|category|categoryIndex|  categoryVec|
+---+--------+-------------+-------------+
|  0|       a|          0.0|(2,[0],[1.0])|
|  1|       b|          2.0|    (2,[],[])|
|  2|       c|          1.0|(2,[1],[1.0])|
|  3|       a|          0.0|(2,[0],[1.0])|
|  4|       a|          0.0|(2,[0],[1.0])|
|  5|       c|          1.0|(2,[1],[1.0])|
+---+--------+-------------+-------------+



- ** 2.2.11 VectorIndexer**
<a id = 'VectorIndexer'></a>

In [17]:
from pyspark.ml.feature import VectorIndexer

data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()


Chose 351 categorical features: 645, 69, 365, 138, 101, 479, 333, 249, 0, 555, 666, 88, 170, 115, 276, 308, 5, 449, 120, 247, 614, 677, 202, 10, 56, 533, 142, 500, 340, 670, 174, 42, 417, 24, 37, 25, 257, 389, 52, 14, 504, 110, 587, 619, 196, 559, 638, 20, 421, 46, 93, 284, 228, 448, 57, 78, 29, 475, 164, 591, 646, 253, 106, 121, 84, 480, 147, 280, 61, 221, 396, 89, 133, 116, 1, 507, 312, 74, 307, 452, 6, 248, 60, 117, 678, 529, 85, 201, 220, 366, 534, 102, 334, 28, 38, 561, 392, 70, 424, 192, 21, 137, 165, 33, 92, 229, 252, 197, 361, 65, 97, 665, 583, 285, 224, 650, 615, 9, 53, 169, 593, 141, 610, 420, 109, 256, 225, 339, 77, 193, 669, 476, 642, 637, 590, 679, 96, 393, 647, 173, 13, 41, 503, 134, 73, 105, 2, 508, 311, 558, 674, 530, 586, 618, 166, 32, 34, 148, 45, 161, 279, 64, 689, 17, 149, 584, 562, 176, 423, 191, 22, 44, 59, 118, 281, 27, 641, 71, 391, 12, 445, 54, 313, 611, 144, 49, 335, 86, 672, 172, 113, 681, 219, 419, 81, 230, 362, 451, 76, 7, 39, 649, 98, 616, 477, 367, 535, 1

- VectorIndexer helps index categorical features in datasets of Vectors. It can both automatically decide which features are categorical and convert original values to category indices. Specifically, it does the following:

- Take an input column of type Vector and a parameter maxCategories.
- Decide which features should be categorical based on the number of distinct values, where features with at most maxCategories are declared categorical.
- Compute 0-based category indices for each categorical feature.
- Index categorical features and transform original feature values to indices.
- Indexing categorical features allows algorithms such as Decision Trees and Tree Ensembles to treat categorical features appropriately, improving performance.

- ** 2.2.12 Normalizer**
<a id = 'Normalizer'></a>

In [18]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = sqlContext.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()

Normalized using L^1 norm
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+

Normalized using L^inf norm
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+



- Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm. It takes parameter p, which specifies the p-norm used for normalization. (p=2p=2 by default.) This normalization can help standardize your input data and improve the behavior of learning algorithms.

- ** 2.2.13 StandardScaler**
<a id = 'StandardScaler'></a>

In [19]:
from pyspark.ml.feature import StandardScaler

dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|


- StandardScaler transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation and/or zero mean. It takes parameters:

    - withStd: True by default. Scales the data to unit standard deviation.
    - withMean: False by default. Centers the data with mean before scaling. It will build a dense output, so take care when applying to sparse input.
- StandardScaler is an Estimator which can be fit on a dataset to produce a StandardScalerModel; this amounts to computing summary statistics. The model can then transform a Vector column in a dataset to have unit standard deviation and/or zero mean features.

- ** 2.2.14 MinMaxScaler**
<a id = 'MinMaxScaler'></a>

In [20]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = sqlContext.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()

Features scaled to range: [0.000000, 1.000000]
+--------------+--------------+
|      features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]| [0.0,0.0,0.0]|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+



- MinMaxScaler transforms a dataset of Vector rows, rescaling each feature to a specific range (often [0, 1]). It takes parameters:

    - min: 0.0 by default. Lower bound after transformation, shared by all features.
    - max: 1.0 by default. Upper bound after transformation, shared by all features.
- MinMaxScaler computes summary statistics on a data set and produces a MinMaxScalerModel. The model can then transform each feature individually such that it is in the given range.

- The rescaled value for a feature E is calculated as,
       - Rescaled(ei)=(ei−Emin)/(Emax−Emin)∗(max−min)+min
- For the case Emax==Emin, Rescaled(ei)=0.5∗(max+min)

- ** 2.2.15 MaxAbsScaler**
<a id = 'MaxAbsScaler'></a>

In [21]:
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = sqlContext.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()

+--------------+----------------+
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]|  [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]|   [1.0,1.0,1.0]|
+--------------+----------------+



- MaxAbsScaler transforms a dataset of Vector rows, rescaling each feature to range [-1, 1] by dividing through the maximum absolute value in each feature. It does not shift/center the data, and thus does not destroy any sparsity.

- MaxAbsScaler computes summary statistics on a data set and produces a MaxAbsScalerModel. The model can then transform each feature individually to range [-1, 1].

- **2.2.16 Bucketizer**
<a id = 'Bucketizer'></a>

In [22]:
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = sqlContext.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()

Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|  -999.9|             0.0|
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
|   999.9|             3.0|
+--------+----------------+



- Bucketizer transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users. It takes a parameter:

    - splits: Parameter for mapping continuous features into buckets. With n+1 splits, there are n buckets. A bucket defined by splits x,y holds values in the range [x,y) except the last bucket, which also includes y. Splits should be strictly increasing. Values at -inf, inf must be explicitly provided to cover all Double values; Otherwise, values outside the splits specified will be treated as errors. Two examples of splits are Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity) and Array(0.0, 1.0, 2.0).

- ** 2.2.17 ElementwiseProduct**
<a id = 'ElementwiseProduct'></a>

In [23]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = sqlContext.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()

+-------------+-----------------+
|       vector|transformedVector|
+-------------+-----------------+
|[1.0,2.0,3.0]|    [0.0,2.0,6.0]|
|[4.0,5.0,6.0]|   [0.0,5.0,12.0]|
+-------------+-----------------+



- ElementwiseProduct multiplies each input vector by a provided “weight” vector, using element-wise multiplication. In other words, it scales each column of the dataset by a scalar multiplier. This represents the Hadamard product between the input vector, v and transforming vector, w, to yield a result vector.


- ** 2.2.18 SQLTransformer**
<a id = 'SQLTransformer'></a>

In [24]:
from pyspark.ml.feature import SQLTransformer

df = sqlContext.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()


+---+---+---+---+----+
| id| v1| v2| v3|  v4|
+---+---+---+---+----+
|  0|1.0|3.0|4.0| 3.0|
|  2|2.0|5.0|7.0|10.0|
+---+---+---+---+----+



- QLTransformer implements the transformations which are defined by SQL statement. Currently we only support SQL syntax like "SELECT ... FROM __THIS__ ..." where "__THIS__" represents the underlying table of the input dataset. The select clause specifies the fields, constants, and expressions to display in the output, and can be any select clause that Spark SQL supports. Users can also use Spark SQL built-in function and UDFs to operate on these selected columns. For example, SQLTransformer supports statements like:

- SELECT a, a + b AS a_b FROM __THIS__
- SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
- SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b

- ** 2.2.19 VectorAssembler**
<a id = 'VectorAssembler'></a>

In [25]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = sqlContext.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



- VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type. In each row, the values of the input columns will be concatenated into a vector in the specified order.

- ** 2.3 Feature Selectors**
<a id = 'Feature Selectors'></a>
    - **2.3.1 VectorSlicer**
    <a id = 'VectorSlicer'></a>

In [26]:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = sqlContext.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()

+--------------------+-------------+
|        userFeatures|     features|
+--------------------+-------------+
|(3,[0,1],[-2.0,2.3])|(1,[0],[2.3])|
|      [-2.0,2.3,0.0]|        [2.3]|
+--------------------+-------------+



- VectorSlicer is a transformer that takes a feature vector and outputs a new feature vector with a sub-array of the original features. It is useful for extracting features from a vector column.

- VectorSlicer accepts a vector column with specified indices, then outputs a new vector column whose values are selected via those indices. There are two types of indices,

- Integer indices that represent the indices into the vector, setIndices().

- String indices that represent the names of features into the vector, setNames(). This requires the vector column to have an AttributeGroup since the implementation matches on the name field of an Attribute.

- Specification by integer and string are both acceptable. Moreover, you can use integer index and string name simultaneously. At least one feature must be selected. Duplicate features are not allowed, so there can be no overlap between selected indices and names. Note that if names of features are selected, an exception will be thrown if empty input attributes are encountered.

- The output vector will order features with the selected indices first (in the order given), followed by the selected names (in the order given).

- ** 2.3.2 RFormula** 
<a id = 'RFormula'></a>

In [27]:
from pyspark.ml.feature import RFormula

dataset = sqlContext.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()

+--------------+-----+
|      features|label|
+--------------+-----+
|[0.0,0.0,18.0]|  1.0|
|[0.0,1.0,12.0]|  0.0|
|[1.0,0.0,15.0]|  0.0|
+--------------+-----+



- RFormula selects columns specified by an R model formula. Currently we support a limited subset of the R operators, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘. The basic operators are:

- ~ separate target and terms
- + concat terms, “+ 0” means removing intercept
- - remove a term, “- 1” means removing intercept
- : interaction (multiplication for numeric values, or binarized categorical values)
- . all columns except target
- Suppose a and b are double columns, we use the following simple examples to illustrate the effect of RFormula:

- y ~ a + b means model y ~ w0 + w1 * a + w2 * b where w0 is the intercept and w1, w2 are coefficients.
- y ~ a + b + a:b - 1 means model y ~ w1 * a + w2 * b + w3 * a * b where w1, w2, w3 are coefficients.
- RFormula produces a vector column of features and a double or string column of label. Like when formulas are used in R for linear regression, string input columns will be one-hot encoded, and numeric columns will be cast to doubles. If the label column is of type string, it will be first transformed to double with StringIndexer. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.

- ** 2.3.3 ChiSqSelector**
<a id = 'ChiSqSelector'></a>

In [28]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

df = sqlContext.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()


ChiSqSelector output with top 1 features selected
+---+------------------+-------+----------------+
| id|          features|clicked|selectedFeatures|
+---+------------------+-------+----------------+
|  7|[0.0,0.0,18.0,1.0]|    1.0|          [18.0]|
|  8|[0.0,1.0,12.0,0.0]|    0.0|          [12.0]|
|  9|[1.0,0.0,15.0,0.1]|    0.0|          [15.0]|
+---+------------------+-------+----------------+



- ChiSqSelector stands for Chi-Squared feature selection. It operates on labeled data with categorical features. ChiSqSelector uses the Chi-Squared test of independence to decide which features to choose. It supports five selection methods: numTopFeatures, percentile, fpr, fdr, fwe: * numTopFeatures chooses a fixed number of top features according to a chi-squared test. This is akin to yielding the features with the most predictive power. * percentile is similar to numTopFeatures but chooses a fraction of all features instead of a fixed number. * fpr chooses all features whose p-values are below a threshold, thus controlling the false positive rate of selection. * fdr uses the Benjamini-Hochberg procedure to choose all features whose false discovery rate is below a threshold. * fwe chooses all features whose p-values are below a threshold. The threshold is scaled by 1/numFeatures, thus controlling the family-wise error rate of selection. By default, the selection method is numTopFeatures, with the default number of top features set to 50. The user can choose a selection method using setSelectorType.

# 3. Unlable topic modeling
<a id = 'Unlable topic modeling'></a>
- ** 3.1 LDA Modeling**
<a id = 'LDA'></a>

In [31]:
data = sc.textFile("data/airlines.csv").zipWithIndex().map(lambda (review,uid): Row(uid= uid, words = review.split(" ")))
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

docDF = sqlContext.createDataFrame(data) 
docDF = remover.transform(docDF)
Vector = CountVectorizer(inputCol="filtered", outputCol="vectors")
model = Vector.fit(docDF)
result = model.transform(docDF)

corpus = result.select("uid", "vectors").rdd.map(lambda (x,y): [x,Vectors.fromML(y)]).cache()

In [15]:
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3,maxIterations=100,optimizer='online')
topics = ldaModel.topicsMatrix()
vocabArray = model.vocabulary

wordNumbers = 10  # number of words per topic
topicIndices = sc.parallelize(ldaModel.describeTopics(maxTermsPerTopic = wordNumbers))

def topic_render(topic):  # specify vector id of words to actual words
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result

topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()

for topic in range(len(topics_final)):
    print ("Topic" + str(topic) + ":")
    for term in topics_final[topic]:
        print (term)
    print ('\n')

Topic0:
fish
lamb
UAL
flatbeds
outlets.
display
recliner
fairness
primary
no-fly


Topic1:
flight
time
flights
service
us
-
plane
hours
get
United


Topic2:
global
flight
slipper
boxes
Lisbon
alone.
smiles.
dead
salad
Minot




- Note: LDA does not perform well with the EMLDAOptimizer which is used by default. In the case of EMLDAOptimizer we have significant bies to the most popular hashtags. I used the OnlineLDAOptimizer instead. The Optimizer implements the Online variational Bayes LDA algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic distribution adaptively.

# LDA Topic Modeling with csv file
<a id = 'LDA Topic Modeling with csv file'></a>

In [3]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id
import re

In [4]:
# Load Data
rawdata = sqlContext.read.load("data/airlines2.csv", format="csv", header=True)
rawdata = rawdata.fillna({'review': ''})                               # Replace nulls with blank string
rawdata = rawdata.withColumn("uid", monotonically_increasing_id())     # Create Unique ID
rawdata = rawdata.withColumn("year_month", rawdata.date.substr(1,7))   # Generate YYYY-MM variable
 
# Show rawdata (as DataFrame)
rawdata.show(10)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+---+----------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|uid|year_month|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+---+----------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|  0|   21-Jun-|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|  1|   19-Jun-|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|  2|   18-Jun-|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|  3|   17-Jun-|
|10005|Delta Air Lines|17-Jun-14| Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|  4|   17-Jun-|
|10006|Delta Air Lines|17-Jun-14|     USA|     9|Business|    5|        YES|Narita - Bangkok ...

- creat new column as unique id, which is use for vectorizing words with features

In [5]:
# Print data types
for type in rawdata.dtypes:
    print type

target = rawdata.select(rawdata['rating'].cast(IntegerType()))
target.dtypes


('id', 'string')
('airline', 'string')
('date', 'string')
('location', 'string')
('rating', 'string')
('cabin', 'string')
('value', 'string')
('recommended', 'string')
('review', 'string')
('uid', 'bigint')
('year_month', 'string')


[('rating', 'int')]

- check data type

In [5]:
def cleanup_text(record):
    text  = record[8]
    uid   = record[9]
    words = text.split()  
    # Default list of Stopwords
    stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at', 
    u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by', 
    u'can', 'cant', 'come', u'could', 'couldnt', 
    u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during', 
    u'each', 
    u'few', 'finally', u'for', u'from', u'further', 
    u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how', 
    u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself', 
    u'just', 
    u'll', 
    u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself', 
    u'no', u'nor', u'not', u'now', 
    u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own', 
    u'r', u're', 
    u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such', 
    u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too', 
    u'under', u'until', u'up', 
    u'very', 
    u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would', 
    u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
    
    # Custom List of Stopwords - Add your own here
    stopwords_custom = ['']
    stopwords = stopwords_core + stopwords_custom
    stopwords = [word.lower() for word in stopwords]    
    
    text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words]                                       # Remove special characters
    text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords]     # Remove stopwords and words under X length
    return text_out

udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = rawdata.withColumn("words", udf_cleantext(struct([rawdata[x] for x in rawdata.columns])))

# tokenizer = Tokenizer(inputCol="description", outputCol="words")
# wordsData = tokenizer.transform(text)



-create words column as string based on rawdata

In [100]:
# Show first row of new dataset
clean_text.take(1)

[Row(id=u'10001', airline=u'Delta Air Lines', date=u'21-Jun-14', location=u'Thailand', rating=u'7', cabin=u'Economy', value=u'4', recommended=u'YES', review=u'Flew Mar 30 NRT to BKK. All flights were great. Flight was on-time and the in-flight entertainment was great. Apart from the meals - some Thai passengers cannot eat beef so the flight crews tried to ask other passengers who could eat beef and changed the meals around. We feel disappointed with their food services.', uid=0, year_month=u'21-Jun-', words=[u'flew', u'mar', u'nrt', u'bkk', u'flights', u'great', u'flight', u'ontime', u'inflight', u'entertainment', u'great', u'apart', u'meals', u'thai', u'passengers', u'cannot', u'eat', u'beef', u'flight', u'crews', u'tried', u'ask', u'passengers', u'eat', u'beef', u'changed', u'meals', u'around', u'feel', u'disappointed', u'food', u'services'])]

In [6]:
# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
# Vectorize words arrays for each uid
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)
# Find TF-IDF coefficients for each word instead of bag of words
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)


In [99]:
# Show first row of final dataset
rescaledData.take(1)

[Row(id=u'10001', airline=u'Delta Air Lines', date=u'21-Jun-14', location=u'Thailand', rating=u'7', cabin=u'Economy', value=u'4', recommended=u'YES', review=u'Flew Mar 30 NRT to BKK. All flights were great. Flight was on-time and the in-flight entertainment was great. Apart from the meals - some Thai passengers cannot eat beef so the flight crews tried to ask other passengers who could eat beef and changed the meals around. We feel disappointed with their food services.', uid=0, year_month=u'21-Jun-', words=[u'flew', u'mar', u'nrt', u'bkk', u'flights', u'great', u'flight', u'ontime', u'inflight', u'entertainment', u'great', u'apart', u'meals', u'thai', u'passengers', u'cannot', u'eat', u'beef', u'flight', u'crews', u'tried', u'ask', u'passengers', u'eat', u'beef', u'changed', u'meals', u'around', u'feel', u'disappointed', u'food', u'services'], rawFeatures=SparseVector(1000, {0: 2.0, 3: 1.0, 11: 1.0, 25: 1.0, 32: 2.0, 46: 1.0, 56: 2.0, 97: 1.0, 113: 1.0, 201: 1.0, 213: 1.0, 249: 2.0, 3

- add features as new column based on clean_text

In [62]:
# LDA Topic modeling
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)

#model.isDistributed()
#model.vocabSize()

ldatopics = ldamodel.describeTopics()
ldatopics.show(25)

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(25,False)

ldaResults = ldamodel.transform(rescaledData)

ldaResults.show()

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[106, 301, 432, 7...|[0.02642483914460...|
|    1|[218, 257, 312, 4...|[0.02860477465189...|
|    2|[869, 639, 155, 2...|[0.01437555629232...|
|    3|[139, 155, 50, 12...|[0.02946689912509...|
|    4|[582, 640, 33, 16...|[0.01636880051698...|
|    5|[498, 251, 48, 26...|[0.01998824201487...|
|    6|[197, 791, 88, 39...|[0.03639006364794...|
|    7|[459, 248, 386, 1...|[0.01892117257323...|
|    8|[237, 761, 78, 31...|[0.01975997181346...|
|    9|[411, 8, 629, 47,...|[0.01284467745242...|
|   10|[573, 796, 723, 1...|[0.01320751612923...|
|   11|[500, 363, 392, 2...|[0.02222368774827...|
|   12|[136, 182, 5, 22,...|[0.01628267590187...|
|   13|[780, 327, 368, 7...|[0.01180130842975...|
|   14|[19, 71, 29, 4, 3...|[0.01849898845552...|
|   15|[54, 8, 26, 389, ...|[0.02528842185304...|
|   16|[601, 435, 100, 6...|[0.01486335804649...|


In [78]:
# Breakout LDA Topics for Modeling and Reporting

def breakout_array(index_number, record):
    vectorlist = record.tolist()
    return vectorlist[index_number]

udf_breakout_array = udf(breakout_array, FloatType())
enrichedData = ldaResults                                                                   \
        .withColumn("Topic_12", udf_breakout_array(lit(12), ldaResults.topicDistribution))  \
        .withColumn("topic_20", udf_breakout_array(lit(20), ldaResults.topicDistribution))            

enrichedData.show()


+-----+---------------+---------+---------+------+--------+-----+-----------+--------------------+---+----------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+
|   id|        airline|     date| location|rating|   cabin|value|recommended|              review|uid|year_month|               words|         rawFeatures|            features|   topicDistribution|   Topic_12|   topic_20|
+-----+---------------+---------+---------+------+--------+-----+-----------+--------------------+---+----------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+
|10001|Delta Air Lines|21-Jun-14| Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|  0|   21-Jun-|[flew, mar, nrt, ...|(1000,[0,3,11,25,...|(1000,[0,3,11,25,...|[0.03326458340521...|0.025253555|  0.0619217|
|10002|Delta Air Lines|19-Jun-14|      USA|     0| Economy|    2|         NO|Flight 2463 leavi...|  1|   19-Jun-

In [80]:
#  Register Table for SparkSQL
enrichedData.createOrReplaceTempView("enrichedData")

sqlContext.sql("SELECT id, airline, date, rating, topic_12 FROM enrichedData")

sqlContext.sql("SELECT id, airline, date, rating, topic_20 FROM enrichedData")


DataFrame[id: string, airline: string, date: string, rating: string, topic_20: float]

# References

- https://spark.apache.org/docs/latest/ml-clustering.html#latent-dirichlet-allocation-lda

- https://community.hortonworks.com/articles/84781/spark-text-analytics-uncovering-data-driven-topics.html

- https://stackoverflow.com/questions/41386557/unable-to-use-structfield-with-pyspark
    
- https://stackoverflow.com/questions/42051184/latent-dirichlet-allocation-lda-in-spark/42200429#42200429
    
- https://github.com/zaratsian/Spark/blob/master/text_analytics_datadriven_topics.py
    
- http://nbviewer.jupyter.org/gist/mizvol/eb24770ac3d5d598463f972e2a669f03