In [3]:
# set up the environment
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark 
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

# Build the index using a document collection

Create an RDD from a text file

Each line of the text file becomes an element of the RDD.

In [4]:
# wholeTextFiles generates an RDD of pair values, 
# where the key is the full path of each file, the value is the content of each file
# input = sc.wholeTextFiles("/content/drive/My\ Drive/input_docs");
# return [(wholepath, wholetxt), ...]
input = sc.wholeTextFiles("/content/drive/My Drive/input_doc_all/");

# Now we strip the prefix of filenames and leave only the basename. 
# e.g. 'file:/content/drive/My Drive/Colab Notebooks/data_spark/input_docs/3.html'
# becomes '3.html' 
import os
from bs4 import BeautifulSoup

input2 = input.map(lambda x: (int(os.path.basename(x[0]).split(".")[0]), x[1]))

In [5]:
input2.take(1)

[(16807, 'What province is Galkayo located in?')]

# Create RDD of （word，（docid，frqc，tf））

In [None]:
# Doc to wordlist function
# The output will be a list of tuples such as 
# ("apple", (3,10,10/20)), 
# where 3 is docid, 
# 10 is frequency of "apple" in this doc, 
# 20 is maxf in in this doc.

# from bs4 import BeautifulSoup
from collections import Counter
import re

from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))

# for a given doc return a list of tuples of the form (w, (docid, freq, freq/maxfreq))
def dw(docid, htmltext):
  
  # Make all words lowercase
  cleantext = BeautifulSoup(htmltext).get_text().lower()
  tokenizedText = re.findall(r"\b[a-z]+\b",cleantext)

  # remove all the stop words, return a list of words
  filteredToken = [w for w in tokenizedText 
                   if not w in stop_words]
  # couter words, return a dic
  tokenCnt = Counter(filteredToken)

  maxf = tokenCnt.most_common(1)[0][1]

  # create a nwe dic
  wdft = dict()

  for t in filteredToken:
    # frqc returns the count of the word
    frqc = tokenCnt[t]

    #return a tuple corresponding the word
    wdft[t] = (docid,frqc,frqc/maxf)
  # return a dic corresponding the word with docid,frqc,frqc/maxf
  return wdft
word_docid_freq_tf = input2.flatMap(lambda x: [(t,dw(x[0],x[1])[t]) for t in dw(x[0],x[1])])

Now create an RDD as follows 

e.g. (word, [(did1,freq1,tf1), (did2,freq2,tf2), ...])

In [None]:
word_docid_freq_tf_2=word_docid_freq_tf.groupByKey().map(lambda x : (x[0], list(x[1])))
print(word_docid_freq_tf_2.take(2))

In [None]:
# We easily obtain idf as 1/len(postinglist_tf)
word_docid_freq_tfidf= word_docid_freq_tf_2.map(lambda x : (x[0], list(map(lambda y: (y[0], y[1], y[2]/len(x[1])), x[1] ))))
print(word_docid_freq_tfidf.take(2))

In [None]:
# Now, we would like to obtain the magnitude of each doc.
# First, produce (did, ( freq,tfidf)) for each word of doc did; 
# We do don't need the word itself, just its (freq,tfidf). 
# Then, do reduceByKey on these tuples and obtain maxfreq and 
# magnitude (squared) for each document. 
import math;
#TODO
# RDD of (did,(freq,tfidf)) tuples
did_freq_tfidf=word_docid_freq_tfidf.flatMapValues(lambda x:x)
print(did_freq_tfidf.take(2))
did_freq_tfidf=did_freq_tfidf.map(lambda x:(x[1][0],(x[1][1],x[1][2]*x[1][2])))
print(did_freq_tfidf.take(2))
doc_maxf_mag=did_freq_tfidf.reduceByKey(lambda x,y:(max(x[0],y[0]),x[1]+y[1]))
print(doc_maxf_mag.take(2))


[('solo', (15, 1, 0.058823529411764705)), ('solo', (10, 1, 0.058823529411764705))]
[(15, (1, 0.0034602076124567475)), (10, (1, 0.0034602076124567475))]
[(10, (1, 0.0038591326840581855)), (14, (1, 0.0035248396701107894))]


# Save doc2mag and index

In [None]:
!rm -rf inv_idx
word_docid_freq_tfidf.repartition(1).saveAsTextFile("inv_idx");

In [None]:
!rm -rf doc_mag
doc_maxf_mag.repartition(1).saveAsTextFile("doc_mag");

In [None]:
!ls -lrt inv_idx
#!head inv_idx/part-00001
#!wc -l inv_idx/part-00000
#!wc -l inv_idx/part-00001
!cat inv_idx/part-00000 > /content/drive/My\ Drive/inv_idx.txt
!wc -l /content/drive/My\ Drive/inv_idx.txt

total 2792
-rw-r--r-- 1 root root 2855593 Jul 14 23:12 part-00000
-rw-r--r-- 1 root root       0 Jul 14 23:12 _SUCCESS
12136 /content/drive/My Drive/inv_idx.txt


In [None]:
!ls -lrt doc_mag
!head doc_mag/part-00000
!wc -l doc_mag/part-00000
!wc -l doc_mag/part-00001
!cat doc_mag/part-00000 > /content/drive/My\ Drive/doc_mag.txt
!wc -l /content/drive/My\ Drive/doc_mag.txt

total 580
-rw-r--r-- 1 root root 591624 Jul 14 23:12 part-00000
-rw-r--r-- 1 root root      0 Jul 14 23:12 _SUCCESS
(10, (1, 0.0038591326840581855))
(14, (1, 0.0035248396701107894))
(2, (1, 0.006177163001462405))
(12, (1, 0.006342785118225462))
(130, (1, 0.005674244508596622))
(134, (1, 0.008530274174313483))
(4764, (1, 0.004658267184246462))
(68, (1, 0.002070250129949385))
(102, (1, 0.06403653109524758))
(104, (1, 0.06804660087133721))
17807 doc_mag/part-00000
wc: doc_mag/part-00001: No such file or directory
17807 /content/drive/My Drive/doc_mag.txt
