In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [4]:
from pyspark.ml.feature import Tokenizer,RegexTokenizer
from pyspark.sql.functions import col,udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

In [5]:
sentenceDataFrame.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic,regressi...|
+---+--------------------+



In [8]:
tokenizer=Tokenizer(inputCol='sentence',outputCol='words')
tokenized=tokenizer.transform(sentenceDataFrame)
countTokens=udf(lambda words:len(words),IntegerType())
tokenized.withColumn('tokens',countTokens(col('words'))).show(truncate=False)

+---+-----------------------------------+------------------------------------------+------+
|id |sentence                           |words                                     |tokens|
+---+-----------------------------------+------------------------------------------+------+
|0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|1  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|2  |Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+---+-----------------------------------+------------------------------------------+------+



In [13]:
regexTokenizer=RegexTokenizer(inputCol='sentence',outputCol='words',pattern="\\W")
regexTokenized=regexTokenizer.transform(sentenceDataFrame)
regexTokenized.withColumn('tokens',countTokens(col('words'))).show(truncate=False)

+---+-----------------------------------+------------------------------------------+------+
|id |sentence                           |words                                     |tokens|
+---+-----------------------------------+------------------------------------------+------+
|0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|1  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|2  |Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+---+-----------------------------------+------------------------------------------+------+



In [14]:
#stop words

In [15]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])


In [17]:
remover=StopWordsRemover(inputCol='raw',outputCol='filtered')
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



In [18]:
##ngrams
from pyspark.ml.feature import NGram
wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])


In [21]:
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDF=ngram.transform(wordDataFrame)
ngramDF.show(1,truncate=False)

+---+----------------------------+-----------------------------------------+
|id |words                       |ngrams                                   |
+---+----------------------------+-----------------------------------------+
|0  |[Hi, I, heard, about, Spark]|[Hi I, I heard, heard about, about Spark]|
+---+----------------------------+-----------------------------------------+
only showing top 1 row



Feature Extractors
TF-IDF
Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus. Denote a term by $t$, a document by d , and the corpus by D. Term frequency $TF(t, d)$ is the number of times that term $t$ appears in document $d$, while document frequency $DF(t, D)$ is the number of documents that contains term $t$. If we only use term frequency to measure the importance, it is very easy to over-emphasize terms that appear very often but carry little information about the document, e.g. “a”, “the”, and “of”. If a term appears very often across the corpus, it means it doesn’t carry special information about a particular document. Inverse document frequency is a numerical measure of how much information a term provides:

IDF(t,D)=log|D|+1DF(t,D)+1
 
where |D| is the total number of documents in the corpus. Since logarithm is used, if a term appears in all documents, its IDF value becomes 0. Note that a smoothing term is applied to avoid dividing by zero for terms outside the corpus. The TF-IDF measure is simply the product of TF and IDF:
TFIDF(t,d,D)=TF(t,d)⋅IDF(t,D).

In [23]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

sentenceData.show(truncate=False)

+-----+-----------------------------------+
|label|sentence                           |
+-----+-----------------------------------+
|0.0  |Hi I heard about Spark             |
|0.0  |I wish Java could use case classes |
|1.0  |Logistic regression models are neat|
+-----+-----------------------------------+



In [24]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show(truncate=False)

+-----+-----------------------------------+------------------------------------------+
|label|sentence                           |words                                     |
+-----+-----------------------------------+------------------------------------------+
|0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|
|1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |
+-----+-----------------------------------+------------------------------------------+



In [30]:
hashingTF=HashingTF(inputCol='words',outputCol='rawfeatures',numFeatures=20)
featurizedData=hashingTF.transform(wordsData)
featurizedData.select('words','rawfeatures').show(truncate=False)

+------------------------------------------+-----------------------------------------+
|words                                     |rawfeatures                              |
+------------------------------------------+-----------------------------------------+
|[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |
|[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |
|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|
+------------------------------------------+-----------------------------------------+



In [32]:
idf=IDF(inputCol='rawfeatures',outputCol='features')
idfModel=idf.fit(featurizedData)
rescaledData=idfModel.transform(featurizedData)
rescaledData.select('rawfeatures','features').show(truncate=False)

+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|rawfeatures                              |features                                                                                                              |
+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |(20,[0,5,9,17],[0.6931471805599453,0.6931471805599453,0.28768207245178085,1.3862943611198906])                        |
|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |(20,[2,7,9,13,15],[0.6931471805599453,0.6931471805599453,0.8630462173553426,0.28768207245178085,0.28768207245178085]) |
|(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|(20,[4,6,13,15,18],[0.6931471805599453,0.6931471805599453,0.28768207245178085,0.28768207245178085,0.6931471805599453])|
+---------------------

In [53]:
#count vectorizer
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.


In [54]:
cv=CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model=cv.fit(df)
result=model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+

