In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME


from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
# Put input_docs_sample.zip in your Google Drive

!rm -rf input_docs
!cp /content/drive/My\ Drive/input_docs.zip .
!unzip input_docs.zip > /dev/null
!ls input_docs/ | wc -l #folder in input_docs_sample

# for the real collection change above input_docs_sample.zip to input_docs.zip
# for the sample collection of 5 docs, the process is fast
# for the real collection, the process takes about 6 min (start to finish, the whole notebook) 

19026


In [0]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

**Create an RDD from a text file**

Each line of the text file becomes an element of the RDD.

In [0]:
# wholeTextFiles generates an RDD of pair values, 
# where the key is the full path of each file, the value is the content of each file
#input = sc.wholeTextFiles("/content/drive/My\ Drive/input_docs");
input = sc.wholeTextFiles("input_docs");

# Now we strip the prefix of filenames and leave only the basename. 
# e.g. 'file:/content/drive/My Drive/Colab Notebooks/data_spark/input_docs/3.html'
# becomes '3.html' 
import os

#(did,text)
input2 = input.map(lambda x: (int(os.path.basename(x[0]).split(".")[0]), x[1]))

print(input2.take(2))

[(12959, "<H2> 3-APR-1987 11:55:12.42</H2>\r\n<H2>POLYSAR IN JOINT VENTURE ON FINLAND LATEX PLANT</H2>\r\nPolysar Ltd, wholly owned by\nCanada Development Corp, said it agreed to form a joint\nventure with Raision Tehtaat, of Finland, to build a synthetic\nrubber latex plant in southern Finland.\n    Project cost and plant capacity were not disclosed.\n    The joint venture, to be 51 pct owned by Raision Tehtaat\nand 49 pct by Polysar, will build a plant at Anjalankoski, east\nof Helsinki, to produce carboxylated styrene-butadiene latex.\n    The plant's production will be used by Finland's paper\nindustry for making coated paper and paper board products.\n \n "), (9696, "<H2>25-MAR-1987 23:10:06.55</H2>\r\n<H2>CHINA, PORTUGAL INITIAL MACAO PACT</H2>\r\nChina and Portugal today initialled a\njoint declaration under which the 400-year-old colony of Macao\nwill be handed over to Peking on December 20, 1999, the\nofficial New China News Agency reported.\n    Portuguese Prime Minister Anib

In [0]:
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [0]:
from bs4 import BeautifulSoup
import collections
from collections import Counter
import re

from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
stem = PorterStemmer()

### <pre>Cleaning all the documents, removing stop words</pre>

In [0]:
# Doc to wordlist function
# The output will be a list of tuples such as 
# ("apple", (3,10,10/20)), 
# where 3 is docid, 
# 10 is frequency of "apple" in this doc, 
# 20 is maxf in in this doc.
%xmode Verbose
stop_words = set(stopwords.words('english'))

# for a given doc return a list of tuples of the form (w, (docid, freq, freq/maxfreq))
def dw(docid, htmltext):

  htmltext = htmltext.lower() #convert into lowercase
  cleantext = BeautifulSoup(htmltext).get_text()
  
  cleantext = re.sub("\d+", " ", cleantext) #remove digits
  cleantext = re.sub('[^A-Za-z0-9]+', ' ', cleantext) #remove special characters
  cleantext = re.sub("\s+", " ", cleantext) #remove extra spaces
  
  cleantext = word_tokenize(cleantext)
  cleantext = [word for word in cleantext if word not in stopwords.words('english')]
  
  output = [];
  counter = Counter(cleantext)
  maxf = Counter(cleantext).most_common(1)[0][1]
  for ele in counter:
    tuple1 = (ele);
    tuple2 = (int(docid),counter[ele],counter[ele]/maxf)
    tuple3 = (tuple1, tuple2)
    output.append(tuple3);
  return(output)

word_docid_freq_tf = input2.flatMap(lambda x: dw(x[0],x[1]))
print(word_docid_freq_tf.take(10))

Exception reporting mode: Verbose
[('apr', (12959, 1, 0.2)), ('polysar', (12959, 3, 0.6)), ('joint', (12959, 3, 0.6)), ('venture', (12959, 3, 0.6)), ('finland', (12959, 4, 0.8)), ('latex', (12959, 3, 0.6)), ('plant', (12959, 5, 1.0)), ('ltd', (12959, 1, 0.2)), ('wholly', (12959, 1, 0.2)), ('owned', (12959, 2, 0.4))]


![alt text](https://)Expected output (all expected results are on the small sample):

<pre>
[('feb', (1, 1, 0.07142857142857142)), ('bahia', (1, 5, 0.35714285714285715))]
</pre>

In [0]:
%xmode Verbose
#combining all the words and documents ids frequency
word_list_freq_tf = word_docid_freq_tf.map(lambda x: (x[0],[x[1]])).reduceByKey(lambda x,y: x+y)
# Now create an RDD as follows 
# (word, [(did1,freq1,tf1), (did2,freq2,tf2), ...])
word_postinglist_freq_tf = sc.parallelize(word_list_freq_tf.collect())
print(word_postinglist_freq_tf.count())

Exception reporting mode: Verbose


In [0]:
print(word_postinglist_freq_tf.map(lambda x : (x[0], list(x[1]))).take(2))

[('apr', [(12959, 1, 0.2), (16400, 1, 0.125), (14357, 1, 0.125), (15275, 1, 0.2), (12027, 1, 0.3333333333333333), (15216, 1, 0.25), (13929, 1, 0.2), (13100, 1, 0.25), (15168, 1, 0.16666666666666666), (17042, 1, 0.16666666666666666), (16253, 1, 0.3333333333333333), (13044, 1, 0.2), (15200, 1, 0.16666666666666666), (11847, 1, 0.1111111111111111), (15613, 1, 0.3333333333333333), (16098, 1, 0.3333333333333333), (16906, 1, 0.16666666666666666), (17301, 1, 0.125), (16087, 1, 0.05263157894736842), (12844, 1, 0.3333333333333333), (17030, 1, 0.08333333333333333), (16056, 1, 0.3333333333333333), (16756, 1, 0.05263157894736842), (17380, 1, 0.14285714285714285), (14166, 1, 0.5), (13608, 2, 0.3333333333333333), (15528, 1, 0.5), (16205, 1, 0.125), (14906, 1, 0.14285714285714285), (15423, 1, 0.25), (14331, 1, 0.1), (16146, 1, 0.1), (16778, 1, 0.16666666666666666), (12804, 1, 0.25), (12205, 1, 0.25), (13045, 1, 0.25), (17353, 1, 0.14285714285714285), (16231, 1, 0.3333333333333333), (15507, 1, 0.25), (

Expected output

<pre>
[('feb', [(1, 1, 0.07142857142857142), (2, 1, 0.2), (5, 1, 0.16666666666666666), (3, 1, 0.3333333333333333), (4, 1, 0.07142857142857142)]), ('bahia', [(1, 5, 0.35714285714285715)])]
</pre>

## Calculating IDF, to find TFIDF for every word in the document

In [0]:
# (word, [(did,freq,tfidf), ...])
# idf = 1/len(postinglist_tf)
%xmode Verbose
%time
import math
from decimal import Decimal
with_tfidf = []
for row in word_postinglist_freq_tf.collect():
  idf = 1/len(row[1]) #present in number of documents
  backto_tuple = []
  for eachrow in row[1]:
    list_row = list(eachrow) #convert to list
    # list_row[2] = float(format(float(list_row[2]*idf), '.5f')) #tf*idf
    tfidf_val = Decimal((list_row[2]*idf)) #tf*idf
    list_row[2] ="{:.18f}".format(float(tfidf_val))
    backto_tuple.append(tuple(list_row))

  eachtuple = (row[0], backto_tuple)
  with_tfidf.append(eachtuple);
# print(with_tfidf[0])
# creating a RDD
word_postinglist_freq_tfidf = sc.parallelize(with_tfidf)

print(word_postinglist_freq_tfidf.take(2))
print(word_postinglist_freq_tfidf.count())

Exception reporting mode: Verbose
CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.72 µs
[('apr', [(12959, 1, '0.000042158516020236'), (16400, 1, '0.000026349072512648'), (14357, 1, '0.000026349072512648'), (15275, 1, '0.000042158516020236'), (12027, 1, '0.000070264193367060'), (15216, 1, '0.000052698145025295'), (13929, 1, '0.000042158516020236'), (13100, 1, '0.000052698145025295'), (15168, 1, '0.000035132096683530'), (17042, 1, '0.000035132096683530'), (16253, 1, '0.000070264193367060'), (13044, 1, '0.000042158516020236'), (15200, 1, '0.000035132096683530'), (11847, 1, '0.000023421397789020'), (15613, 1, '0.000070264193367060'), (16098, 1, '0.000070264193367060'), (16906, 1, '0.000035132096683530'), (17301, 1, '0.000026349072512648'), (16087, 1, '0.000011094346321115'), (12844, 1, '0.000070264193367060'), (17030, 1, '0.000017566048341765'), (16056, 1, '0.000070264193367060'), (16756, 1, '0.000011094346321115'), (17380, 1, '0.000030113225728740'), (14166, 1, '0.00010539629005

Expected output

<pre>
[('feb', [(1, 1, 0.014285714285714285), (2, 1, 0.04), (5, 1, 0.03333333333333333), (3, 1, 0.06666666666666667), (4, 1, 0.014285714285714285)])]
</pre>

## Calculating Magnitude of each document, which will be used for calculating Cosine similarity

In [0]:
# Now, we would like to obtain the magnitude of each doc.
# First, produce (did, (freq,tfidf)) for each word of doc did; 
# We do don't need the word itself, just its (freq,tfidf). 
# Then, do reduceByKey on these tuples and obtain maxfreq and 
# magnitude (squared) for each document. 
%xmode Verbose
import math
from decimal import Decimal

convert_to_list = word_postinglist_freq_tfidf.collect() #make a list
convert_to_list = list(map(list, convert_to_list)) 
new_list = []
for val in convert_to_list: 
  for row in val[1]:
    row = list(row)
    # print(format(Decimal.from_float(0.1), '.17'))
    # squareval = float(format((row[2]*row[2]), '.15'))
    # print(type(row[2]))
    magnitude = Decimal((float(row[2])**2))
    magnitude ="{:.18f}".format(float(magnitude))
    row = (row[0],(row[1],magnitude))
  new_list.append(row)

# RDD of (did,(freq,tfidf)) tuples
# creating a RDD
did_freq_tfidfsq_rdd = sc.parallelize(new_list)

print(did_freq_tfidfsq_rdd.take(2))
new_list1 = did_freq_tfidfsq_rdd.reduceByKey(lambda x,y: (max(x[0],y[0]),(float(x[1])+float(y[1]))))

# Produce (did,(maxf,magnitudesq))
# creating a RDD
doc_maxf_mag = sc.parallelize(new_list1.collect())

print(doc_maxf_mag.count())
print(doc_maxf_mag.take(2))

Exception reporting mode: Verbose
[(13084, (1, '0.000000002777094489')), (20267, (1, '0.000277777777777778'))]
11881
[(13084, (4, 0.027359572714426773)), (10038, (6, 0.01409554160278653))]


Excpected result

<pre>
[(1, (1, 0.0002040816326530612)), (2, (1, 0.0016))]
[(2, (5, 3.894100000000001)), (4, (14, 2.94553429705215))]
</pre>

In [0]:
!rm -rf inv_idx
word_postinglist_freq_tfidf.saveAsTextFile("inv_idx");

In [0]:
!rm -rf doc_mag
doc_maxf_mag.saveAsTextFile("doc_mag");

In [0]:
!ls -lrt inv_idx
!head inv_idx/part-00001
!wc -l inv_idx/part-00000
!wc -l inv_idx/part-00001
!cat inv_idx/part-00000 inv_idx/part-00001 > /content/drive/My\ Drive/inv_idx.txt
!wc -l /content/drive/My\ Drive/inv_idx.txt

total 37808
-rw-r--r-- 1 root root 14456549 Apr  1 03:32 part-00001
-rw-r--r-- 1 root root 24255093 Apr  1 03:32 part-00000
-rw-r--r-- 1 root root        0 Apr  1 03:32 _SUCCESS
('enterprises', [(5155, 1, 0.0009523809523809524), (6112, 1, 0.00043956043956043956), (387, 1, 0.001142857142857143), (16778, 1, 0.0009523809523809524), (10733, 1, 0.0004761904761904762), (10248, 1, 0.0009523809523809524), (10799, 1, 0.0004761904761904762), (8909, 1, 0.0014285714285714286), (19669, 1, 0.0014285714285714286), (6917, 1, 0.005714285714285714), (5150, 1, 0.0005714285714285715), (7237, 1, 0.001142857142857143), (340, 1, 0.00035714285714285714), (20776, 2, 0.002857142857142857), (7183, 1, 0.002857142857142857), (17652, 1, 0.001142857142857143), (6021, 1, 0.0014285714285714286), (20300, 1, 0.00043956043956043956), (4404, 1, 0.0009523809523809524), (9277, 2, 0.0016326530612244896), (8484, 2, 0.0016326530612244896), (19723, 1, 0.0014285714285714286), (6351, 1, 0.0008163265306122448), (14824, 1, 0.000816

In [0]:
!ls -lrt doc_mag
!head doc_mag/part-00000
!wc -l doc_mag/part-00000
!wc -l doc_mag/part-00001
!cat doc_mag/part-00000 doc_mag/part-00001 > /content/drive/My\ Drive/doc_mag.txt
!wc -l /content/drive/My\ Drive/doc_mag.txt

total 356
-rw-r--r-- 1 root root 173357 Apr  1 03:32 part-00001
-rw-r--r-- 1 root root 187283 Apr  1 03:32 part-00000
-rw-r--r-- 1 root root      0 Apr  1 03:32 _SUCCESS
(17140, (6, 0.008196472745010674))
(10758, (2, 0.0076516244830538115))
(2390, (5, 0.08533953812055374))
(17450, (9, 0.05579457800651008))
(16964, (6, 0.04036113962577953))
(14862, (13, 0.008345370083899458))
(9524, (4, 0.022032205778007994))
(3034, (4, 0.03830034493605944))
(20088, (3, 0.10528742412723764))
(220, (4, 0.03908383018696178))
6144 doc_mag/part-00000
5715 doc_mag/part-00001
11859 /content/drive/My Drive/doc_mag.txt
