<a href="https://colab.research.google.com/github/smqhw/kdm1/blob/main/icp5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [24]:
!pip install pyspark



In [23]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec



In [25]:
# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

In [70]:
# creating spark dataframe wiht the input data. You can also read the data from file. label represents the 3 documnets (0.0,0.1,0.2)
sentenceData = spark.createDataFrame([
       (0.0, "Welcome to KDM TF_IDF Tutorial."),
       (0.1, "Learn Spark ml tf_idf in today's lab."),
        (0.2,"Spark Mllib has TF-IDF.")
     ], ["label", "sentence"])

In [77]:
import pandas as pd

In [80]:
# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

In [28]:
wordsData.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Welcome to KDM TF...|[welcome, to, kdm...|
|  0.1|Learn Spark ml tf...|[learn, spark, ml...|
|  0.2|Spark Mllib has T...|[spark, mllib, ha...|
+-----+--------------------+--------------------+



In [30]:
# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors"
featurizedData.show()

+-----+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|  0.0|Welcome to KDM TF...|[welcome, to, kdm...|(20,[2,8,13,15,17...|
|  0.1|Learn Spark ml tf...|[learn, spark, ml...|(20,[2,3,6,7],[2....|
|  0.2|Spark Mllib has T...|[spark, mllib, ha...|(20,[6,14,15],[2....|
+-----+--------------------+--------------------+--------------------+



In [34]:
# calculating the IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show()

+-----+--------------------+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|            features|
+-----+--------------------+--------------------+--------------------+--------------------+
|  0.0|Welcome to KDM TF...|[welcome, to, kdm...|(20,[2,8,13,15,17...|(20,[2,8,13,15,17...|
|  0.1|Learn Spark ml tf...|[learn, spark, ml...|(20,[2,3,6,7],[2....|(20,[2,3,6,7],[0....|
|  0.2|Spark Mllib has T...|[spark, mllib, ha...|(20,[6,14,15],[2....|(20,[6,14,15],[0....|
+-----+--------------------+--------------------+--------------------+--------------------+



In [35]:
#displaying the results
rescaledData.select("label", "features").show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[2,8,13,15,17...|
|  0.1|(20,[2,3,6,7],[0....|
|  0.2|(20,[6,14,15],[0....|
+-----+--------------------+



In [36]:
spark2 = SparkSession.builder.appName("Ngram Example").getOrCreate()

In [40]:
#creating dataframe of input\r\n",
wordDataFrame = spark2.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish","Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

wordDataFrame.show()

+---+--------------------+
| id|               words|
+---+--------------------+
|  0|[Hi, I, heard, ab...|
|  1|[I, wish, Java, c...|
|  2|[Logistic, regres...|
+---+--------------------+



In [43]:
#creating NGrams with n=2 (two words)
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)


In [44]:
# displaying the results
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [45]:
# creating spark session
spark3 = SparkSession.builder.appName("Word2Vec Example").getOrCreate()

In [50]:
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark3.createDataFrame([
    ("McCarthy was asked to analyse the data from the first phase of trials of the vaccine.".split(" "), ),
    ("We have amassed the raw data and are about to begin analysing it.".split(" "), ),
    ("Without more data we cannot make a meaningful comparison of the two systems.".split(" "), ),
    ("Collecting data is a painfully slow process.".split(" "), ),
    ("You need a long series of data to be able to discern such a trend.".split(" "), )
], ["text"])
documentDF.show(truncate=False)

+------------------------------------------------------------------------------------------------------+
|text                                                                                                  |
+------------------------------------------------------------------------------------------------------+
|[McCarthy, was, asked, to, analyse, the, data, from, the, first, phase, of, trials, of, the, vaccine.]|
|[We, have, amassed, the, raw, data, and, are, about, to, begin, analysing, it.]                       |
|[Without, more, data, we, cannot, make, a, meaningful, comparison, of, the, two, systems.]            |
|[Collecting, data, is, a, painfully, slow, process.]                                                  |
|[You, need, a, long, series, of, data, to, be, able, to, discern, such, a, trend.]                    |
+------------------------------------------------------------------------------------------------------+



In [62]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=3, inputCol="text", outputCol="result")  
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
result.show(truncate=False)

+------------------------------------------------------------------------------------------------------+------------------------------------------------------------------+
|text                                                                                                  |result                                                            |
+------------------------------------------------------------------------------------------------------+------------------------------------------------------------------+
|[McCarthy, was, asked, to, analyse, the, data, from, the, first, phase, of, trials, of, the, vaccine.]|[0.009438527398742735,0.014125154353678226,-1.5464494936168194E-4]|
|[We, have, amassed, the, raw, data, and, are, about, to, begin, analysing, it.]                       |[0.008283712113132844,0.01625597133086278,-0.0036636972083495217] |
|[Without, more, data, we, cannot, make, a, meaningful, comparison, of, the, two, systems.]            |[-0.004112093064647455,0.00545386626

In [53]:
for row in result.collect():
    text, vector = row
#printing the results
print("Text: [%s] => \\nVector: %s\\n" % (", ".join(text), str(vector)))

Text: [You, need, a, long, series, of, data, to, be, able, to, discern, such, a, trend.] => \nVector: [0.004335567106803258,0.05279908715747297,-0.02296845242381096]\n


In [54]:
# showing the synonyms and cosine similarity of the word in input data
synonyms = model.findSynonyms("data", 10)   
# its okay for certain words , real bad for others
synonyms.show(10)

+---------+------------------+
|     word|        similarity|
+---------+------------------+
|painfully| 0.979426383972168|
|    asked| 0.889884352684021|
|       be| 0.866216242313385|
|     long|0.8165060877799988|
|    begin| 0.814436137676239|
|    about|0.7007120847702026|
|       to|0.6835429668426514|
|       of| 0.676819384098053|
|  analyse|0.6697208881378174|
|     more|0.6340227127075195|
+---------+------------------+



In [40]:
#closing the spark sessions\r\n",
spark.stop()
spark2.stop()
spark3.stop()

In [86]:
with open("/content/text/a1.txt","r+") as a1:
 doc1 = a1.read()
with open("/content/text/a2.txt","r+") as a2:
 doc2 = a2.read()
with open("/content/text/a3.txt","r+") as a3:
 doc3 = a3.read()
with open("/content/text/a4.txt","r+") as a4:
 doc4 = a4.read()
with open("/content/text/a5.txt","r+") as a5:
 doc5 = a5.read()
# Read all 5 txt files which contains news articles
documents = [doc1,doc2,doc3,doc4,doc5]

In [None]:
# **a.Find out the top10 TF-IDF words for the above input.**

In [87]:
from sklearn.feature_extraction.text import TfidfVectorizer
import pandas as pd
# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus
vect = TfidfVectorizer()
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame 
pd.set_option('display.max_columns', 20)
df.loc['Total'] = df.sum()
# adding row to value total
#filtering values of words whos tfidf is greater than 0.3
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

           have       you     price       had        in  administration  \
0      0.088045  0.394401  0.000000  0.000000  0.148133        0.074066   
1      0.000000  0.000000  0.000000  0.177663  0.074728        0.149456   
2      0.097540  0.000000  0.000000  0.000000  0.000000        0.000000   
3      0.199028  0.000000  0.239767  0.099514  0.083714        0.083714   
4      0.000000  0.000000  0.218510  0.181383  0.152585        0.152585   
Total  0.384613  0.394401  0.458277  0.458561  0.459160        0.459822   

             of        to      that       the  
0      0.088045  0.088045  0.074066  0.187934  
1      0.000000  0.000000  0.149456  0.379227  
2      0.195080  0.097540  0.000000  0.277602  
3      0.000000  0.298542  0.167429  0.283220  
4      0.181383  0.000000  0.152585  0.129056  
Total  0.464508  0.484127  0.543536  1.257040  


In [None]:
# **`b.Find out the top10 TF-IDF words for the lemmatized input`**

In [89]:
import nltk;nltk.download('punkt');nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
words1 = nltk.word_tokenize(doc1)
words2 = nltk.word_tokenize(doc2)
words3 = nltk.word_tokenize(doc3)
words4 = nltk.word_tokenize(doc4)
words5 = nltk.word_tokenize(doc5)
lemmatized_document1 = ' '.join([lemmatizer.lemmatize(w) for w in words1])
lemmatized_document2 = ' '.join([lemmatizer.lemmatize(w) for w in words2])
lemmatized_document3 = ' '.join([lemmatizer.lemmatize(w) for w in words3])
lemmatized_document4 = ' '.join([lemmatizer.lemmatize(w) for w in words4])
lemmatized_document5 = ' '.join([lemmatizer.lemmatize(w) for w in words5])
documents = [lemmatized_document1,lemmatized_document2,lemmatized_document3,lemmatized_document4,lemmatized_document5]
        
# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus\r\n",
vect = TfidfVectorizer()
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame 
        
df.loc['Total'] = df.sum() 
# adding row to value total
        
#filtering values of words whos tfidf is greater than 0.3
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
           have       you     price        in        of       had  \
0      0.088045  0.394401  0.000000  0.148133  0.088045  0.000000   
1      0.000000  0.000000  0.000000  0.077762  0.000000  0.184877   
2      0.097903  0.000000  0.000000  0.000000  0.195807  0.000000   
3      0.199028  0.000000  0.239767  0.083714  0.000000  0.099514   
4      0.000000  0.000000  0.218510  0.152585  0.181383  0.181383   
Total  0.384977  0.394401  0.458277  0.462194  0.465235  0.465775   

       administration        to      that       the  
0            0.074066  0.088045  0.074066  0.187934  
1            0.155525  0.000000  0.155525  0.394625  
2            0.000000  0.097903  0.000000  0.278636  
3            0.083714  0.298542  0.167429  0.283220  
4            0.15258

In [None]:
# **c.Find out the top10TF-IDF words for the n-gram based input.**

In [107]:
# this function takes document and n  int value to generate list of n grams
def ngrams(input, n):
  input=input.split(' ')
  output= []
  for i in range(len(input)-n+1): 
      output.append(input[i:i+n])
  return output
        
ngram_doc1 = ' '.join([' '.join(x) for x in ngrams(doc1, 3)])
ngram_doc2 = ' '.join([' '.join(x) for x in ngrams(doc2, 3)])
ngram_doc3 = ' '.join([' '.join(x) for x in ngrams(doc3, 3)])
ngram_doc4 = ' '.join([' '.join(x) for x in ngrams(doc4, 3)])
ngram_doc5 = ' '.join([' '.join(x) for x in ngrams(doc5, 3)])
        
# documents = [ngram_doc1,ngram_doc2,ngram_doc3,ngram_doc4,ngram_doc5]
  
documents = [doc1,doc2,doc3,doc4,doc5]
        
# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus
vect = TfidfVectorizer( ngram_range=(3,3)) 
# TfidfVectorizer has inbuilt ngram kwarg which show tfidf for ngrams
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame 
        
df.loc['Total'] = df.sum() 
# adding row to value total
        
#filtering values of words whos tfidf is greater than 0.3
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

       under the trump  reporter matt lee  that all of  lee piped in  \
0             0.000000           0.000000     0.000000      0.000000   
1             0.000000           0.000000     0.000000      0.000000   
2             0.000000           0.000000     0.000000      0.000000   
3             0.000000           0.000000     0.000000      0.000000   
4             0.238022           0.238022     0.238022      0.238022   
Total         0.238022           0.238022     0.238022      0.238022   

       piped in telling  price that all  this work had  press reporter matt  \
0              0.000000        0.000000       0.000000             0.000000   
1              0.000000        0.000000       0.000000             0.000000   
2              0.000000        0.000000       0.000000             0.000000   
3              0.000000        0.000000       0.000000             0.000000   
4              0.238022        0.238022       0.238022             0.238022   
Total          0.2380

In [None]:
# **2.Write a simple spark program to read a dataset and find the W2V similar words (words with higher cosine similarity) for the Top10 TF-IDF Words**

In [108]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec
# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()
        
documentData = spark.createDataFrame([
                 (0.0, doc1),
                 (0.1, doc2),
                 (0.2, doc3),
                 (0.3, doc4),
                (0.5, doc5)
            ], ["label", "document"])
        
# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsData = tokenizer.transform(documentData)
print (documentData)
wordsData.show()

DataFrame[label: double, document: string]
+-----+--------------------+--------------------+
|label|            document|               words|
+-----+--------------------+--------------------+
|  0.0|"You guys have on...|["you, guys, have...|
|  0.1|Senior Republican...|[senior, republic...|
|  0.2|Kremlin critics h...|[kremlin, critics...|
|  0.3|At Monday’s press...|[at, monday’s, pr...|
|  0.5|Associated Press ...|[associated, pres...|
+-----+--------------------+--------------------+



In [114]:
# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=200)
tf = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
# calculating the IDF
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results
tfidf.select("label", "features").show()
print("TF-IDF without NLP:")
for each in tfidf.collect():
 print(each)
print(each['rawFeatures'])
spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(200,[5,6,8,16,17...|
|  0.1|(200,[5,8,16,17,2...|
|  0.2|(200,[7,11,16,17,...|
|  0.3|(200,[3,6,8,16,17...|
|  0.5|(200,[17,36,56,65...|
+-----+--------------------+

TF-IDF without NLP:
Row(label=0.0, document='"You guys have only been in office for a month, right? Are you telling me that in the last four weeks these 18 companies all of the sudden decided to say, ‘Oh my God! We better not doing anything with Nord Stream 2," Lee said. "You guys are taking credit for stuff the previous administration did. Yes or no?"', words=['"you', 'guys', 'have', 'only', 'been', 'in', 'office', 'for', 'a', 'month,', 'right?', 'are', 'you', 'telling', 'me', 'that', 'in', 'the', 'last', 'four', 'weeks', 'these', '18', 'companies', 'all', 'of', 'the', 'sudden', 'decided', 'to', 'say,', '‘oh', 'my', 'god!', 'we', 'better', 'not', 'doing', 'anything', 'with', 'nord', 'stream', '2,"', 'lee', 'said.', '"you', 'guy

In [None]:
# **b.Try with Lemmatization**

In [118]:
import nltk;nltk.download('punkt');nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

words1 = nltk.word_tokenize(doc1)
words2 = nltk.word_tokenize(doc2)
words3 = nltk.word_tokenize(doc3)
words4 = nltk.word_tokenize(doc4)
words5 = nltk.word_tokenize(doc5)
        
lemmatized_document1 = ' '.join([lemmatizer.lemmatize(w) for w in words1])
lemmatized_document2 = ' '.join([lemmatizer.lemmatize(w) for w in words2])
lemmatized_document3 = ' '.join([lemmatizer.lemmatize(w) for w in words3])
lemmatized_document4 = ' '.join([lemmatizer.lemmatize(w) for w in words4])
lemmatized_document5 = ' '.join([lemmatizer.lemmatize(w) for w in words5])

### lemmatizing words from 5 input docs same as previos task
        
# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()
        
documentData = spark.createDataFrame([
              (0.0, lemmatized_document1),
              (0.1, lemmatized_document2),
              (0.2, lemmatized_document3),
              (0.3, lemmatized_document4),
              (0.5, lemmatized_document5)
          ], ["label", "document"])
        
# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsData = tokenizer.transform(documentData)
print (documentData)
wordsData.show()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
DataFrame[label: double, document: string]
+-----+--------------------+--------------------+
|label|            document|               words|
+-----+--------------------+--------------------+
|  0.0|`` You guy have o...|[``, you, guy, ha...|
|  0.1|Senior Republican...|[senior, republic...|
|  0.2|Kremlin critic ha...|[kremlin, critic,...|
|  0.3|At Monday ’ s pre...|[at, monday, ’, s...|
|  0.5|Associated Press ...|[associated, pres...|
+-----+--------------------+--------------------+



In [120]:
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()
documentData = spark.createDataFrame([
         (0.0, doc1.split(' ')),
         (0.1, doc2.split(' ')),
         (0.2, doc3.split(' ')),
         (0.3, doc4.split(' ')),
         (0.4, doc5.split(' '))
          ], ["label", "document"])
        
        
ngram = NGram(n=2, inputCol="document", outputCol="ngrams")
ngramDataFrame = ngram.transform(documentData)
# applying tf on the words data
hashingTF = HashingTF(inputCol="ngrams", outputCol="rawFeatures", numFeatures=200)
tf = hashingTF.transform(ngramDataFrame)
# alternatively, CountVectorizer can also be used to get term frequency vectors
# calculating the IDF
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results
tfidf.select("label", "features").show()
        
        
print("TF-IDF with ngram:")
for each in tfidf.collect():
 print(each)
print(each['rawFeatures'])
spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(200,[2,3,7,17,24...|
|  0.1|(200,[0,1,2,3,4,8...|
|  0.2|(200,[1,11,13,15,...|
|  0.3|(200,[1,2,7,8,14,...|
|  0.4|(200,[18,26,30,42...|
+-----+--------------------+

TF-IDF with ngram:
Row(label=0.0, document=['"You', 'guys', 'have', 'only', 'been', 'in', 'office', 'for', 'a', 'month,', 'right?', 'Are', 'you', 'telling', 'me', 'that', 'in', 'the', 'last', 'four', 'weeks', 'these', '18', 'companies', 'all', 'of', 'the', 'sudden', 'decided', 'to', 'say,', '‘Oh', 'my', 'God!', 'We', 'better', 'not', 'doing', 'anything', 'with', 'Nord', 'Stream', '2,"', 'Lee', 'said.', '"You', 'guys', 'are', 'taking', 'credit', 'for', 'stuff', 'the', 'previous', 'administration', 'did.', 'Yes', 'or', 'no?"'], ngrams=['"You guys', 'guys have', 'have only', 'only been', 'been in', 'in office', 'office for', 'for a', 'a month,', 'month, right?', 'right? Are', 'Are you', 'you telling', 'telling me', 'me that', 'that