In [3]:
import findspark
findspark.init()

In [4]:
import pyspark
sc = pyspark.SparkContext()

# TF-IDF

In [5]:
from __future__ import print_function
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

In [6]:
# creating spark dataframe wiht the input data. You can also read the data from file. label represents the 3 documnets (0.0,0.1,0.2)
sentenceData = spark.createDataFrame([
        (0.0, "Welcome to KDM TF_IDF Tutorial."),
        (0.1, "Learn Spark ml tf_idf in today's lab."),
        (0.2, "Spark Mllib has TF-IDF.")
    ], ["label", "sentence"])
sentenceData

DataFrame[label: double, sentence: string]

In [7]:
# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

wordsData

DataFrame[label: double, sentence: string, words: array<string>]

In [8]:
# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

hashingTF

HashingTF_522136181a00

In [9]:
featurizedData

DataFrame[label: double, sentence: string, words: array<string>, rawFeatures: vector]

In [10]:
# calculating the IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

idf

IDF_92916260a260

In [11]:
rescaledData

DataFrame[label: double, sentence: string, words: array<string>, rawFeatures: vector, features: vector]

In [12]:
#displaying the results
rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[2,8,13,15,17...|
|  0.1|(20,[2,3,6,7],[0....|
|  0.2|(20,[6,14,15],[0....|
+-----+--------------------+



# N-Gram

In [14]:
from pyspark.ml.feature import NGram
#creating dataframe of input
wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

wordDataFrame

DataFrame[id: bigint, words: array<string>]

In [15]:
#creating NGrams with n=2 (two words)
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame

DataFrame[id: bigint, words: array<string>, ngrams: array<string>]

In [16]:
# displaying the results
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



# Word2Vec

In [17]:
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("McCarthy was asked to analyse the data from the first phase of trials of the vaccine.".split(" "), ),
    ("We have amassed the raw data and are about to begin analysing it.".split(" "), ),
    ("Without more data we cannot make a meaningful comparison of the two systems.".split(" "), ),
    ("Collecting data is a painfully slow process.".split(" "), ),
    ("You need a long series of data to be able to discern such a trend.".split(" "), )
], ["text"])

documentDF

DataFrame[text: array<string>]

In [18]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
result

DataFrame[text: array<string>, result: vector]

In [19]:
for row in result.collect():
    text, vector = row
    #printing the results
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [McCarthy, was, asked, to, analyse, the, data, from, the, first, phase, of, trials, of, the, vaccine.] => 
Vector: [0.035033310727158096,-0.032570564886555076,-0.004972728493157774]

Text: [We, have, amassed, the, raw, data, and, are, about, to, begin, analysing, it.] => 
Vector: [0.012736818824823087,-0.005370451047873268,-0.03244196294018856]

Text: [Without, more, data, we, cannot, make, a, meaningful, comparison, of, the, two, systems.] => 
Vector: [0.03442936315416144,0.0062795166785900415,0.04459071166526813]

Text: [Collecting, data, is, a, painfully, slow, process.] => 
Vector: [0.03440210635640791,-0.026394481105463843,-0.0007667035928794315]

Text: [You, need, a, long, series, of, data, to, be, able, to, discern, such, a, trend.] => 
Vector: [-0.034745236734549205,0.02233983635281523,0.0018616045514742534]



In [20]:
# showing the synonyms and cosine similarity of the word in input data
synonyms = model.findSynonyms("data", 5)   # its okay for certain words , real bad for others
synonyms.show(5)

+----------+------------------+
|      word|        similarity|
+----------+------------------+
|       the|0.9785391688346863|
|  process.| 0.968315064907074|
|       was|0.8777697682380676|
|Collecting|0.8135020136833191|
|    series|0.7547667622566223|
+----------+------------------+

