Install required packages

In [None]:
!gcloud dataproc clusters list --region us-central1
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
!ls -l /usr/lib/spark/jars/graph*

some imports

In [None]:

import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
import numpy as np
from google.cloud import storage
import math
import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf,SparkFiles


In [None]:
from pyspark.shell import spark

spark

create the connection to our bucket in GCP

In [None]:
bucket_name = '316139070_206204588'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh':
        paths.append(full_path+b.name)

In [None]:
parquetFile = spark.read.parquet(*paths)

In [None]:
%cd -q /home/dataproc
!ls inverted_index_gcp.py

In [None]:
from pyspark.shell import sc

sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
# Import MultiFile and InvertedIndex classes
from inverted_index_gcp import *

Extract data from wikipedia

In [None]:
parquetFile = spark.read.parquet(*paths)

doc_text_pairs = parquetFile.select("id", "text").rdd
doc_title_pairs = parquetFile.select("id","title").rdd
anchor_text_as_is= parquetFile.select("id","anchor_text").rdd

In [None]:
# in order to rearrange and calculate the doc_anchor pairs
doc_anchor_pairs=anchor_text_as_is.flatMap(lambda x:x[1]).groupByKey().mapValues(list).map(lambda x:(x[0]," ".join([y for y in x[1]])))

Define stopwords by unite english and corpus stopwords

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)

RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE) # regular expression

def tokenize(text,gate):
    list_of_tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    if gate:        # WITHOUT stopwords
            list_of_tokens = [token for token in list_of_tokens if token not in all_stopwords]
    return list_of_tokens

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS
def word_count(text, id, remove_stopword):
    ''' Count the frequency of each word in `text` (tf) that is not included in
    `all_stopwords` and return entries that will go into our posting lists.
    Parameters:
    -----------
      text: str
        Text of one document
      id: int
        Document id
    Returns:
    --------
      List of tuples
        A list of (token, (doc_id, tf)) pairs
        for example: [("Anarchism", (12, 5)), ...]
    '''
    tokens = tokenize(text,remove_stopword)
    count_by_token = {}
    for token in tokens:
        if token not in all_stopwords:
            if token not in count_by_token.keys():
                count_by_token[token] = 1
            else:
                count_by_token[token] += 1
    token_with_tuple = []
    for token in count_by_token.keys():
        token_with_tuple.append((token, (id, count_by_token[token])))
    return token_with_tuple

def doc_to_term_counter(text, exclude_stopword):
  '''
  Calculates word counter for a given document
  '''
  tokens = tokenize(text, exclude_stopword)
  token_counter = Counter(tokens)
  return token_counter


Functions used during the building of the index

In [None]:
def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  sorted_pl = sorted(unsorted_pl,key= lambda tup:tup[0])
  return sorted_pl

def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  new_rdd = postings.map(lambda tok : (tok[0],len(tok[1]))) # posting[i]= ("token",[(doc_id1,tf1),(doc_id2,tf2)])
  return new_rdd

In [None]:
def calc_document_len(text, id,remove_stopwords):
    tokens_lst = tokenize(text, remove_stopwords)

    return (id, len(tokens_lst))

In [None]:
def partition_postings_and_write(postings,  storage_path):
  ''' A function that partitions the posting lists into buckets, writes out 
  all posting lists in a bucket to disk, and returns the posting locations for 
  each bucket. Partitioning should be done through the use of `token2bucket` 
  above. Writing to disk should use the function  `write_a_posting_list`, a 
  static method implemented in inverted_index_colab.py under the InvertedIndex 
  class. 
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
    offset: int
      The bucket number to start writing from.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and 
      offsets its posting list was written to. See `write_a_posting_list` for 
      more details.
  '''
  # YOUR CODE HERE
  return postings.map(lambda x: (token2bucket_id(x[0]),(x[0],x[1]))).groupByKey().map(lambda x: InvertedIndex.write_a_posting_list(x, bucket_name, storage_path))


In [None]:
# word counts on each one , for each term calc the tf
word_counts_text = doc_text_pairs.flatMap(lambda x: word_count(x[1], x[0], True))
word_counts_title = doc_title_pairs.flatMap(lambda x: word_count(x[1], x[0], False))
word_counts_anchor = doc_anchor_pairs.flatMap(lambda x: word_count(x[1], x[0], False))

# group by term and create a posting list (for each term) consists of ( doc_id ,tf ) pairs sorted by tf
postings_text = word_counts_text.groupByKey().mapValues(reduce_word_counts)
postings_title = word_counts_title.groupByKey().mapValues(reduce_word_counts)
postings_anchor = word_counts_anchor.groupByKey().mapValues(reduce_word_counts)

# filtering postings in text ( body)
postings_filtered_text = postings_text.filter(lambda x: len(x[1])>50)


calculate document frequency for text(body) , title , anchor

In [None]:
w2df_text = calculate_df(postings_filtered_text)
w2df_title = calculate_df(postings_title)
w2df_anchor = calculate_df(postings_anchor)

calculate the following for the title in order to create title index

In [None]:
w2df_dict_title = w2df_title.collectAsMap()

print('complete 1')
document_len_title= doc_title_pairs.map(lambda tup: calc_document_len(tup[1],tup[0],False)).collectAsMap()
print('complete 2')
posting_locs_list_title = partition_postings_and_write(postings_title,"title_index").collect()
print('complete 3')
title_token_counter_by_doc_id = doc_title_pairs.map(lambda pair: (pair[0], doc_to_term_counter(pair[1],True)))
print('complete 4')

doc_id_to_norm_title = title_token_counter_by_doc_id.map(lambda docid_counter: (docid_counter[0], np.linalg.norm([(docid_counter[1][term] / document_len_title[docid_counter[0]]) * math.log(len(document_len_title) / w2df_dict_title.get(term, len(document_len_title)), 10) for term in docid_counter[1]]))).collectAsMap()


In [None]:
doc_title_pairs_dict=doc_title_pairs.collectAsMap()     # define doc_title pairs , will be used in each index : (doc_id , title name), ...

Create the title index

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs_title = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/title_index'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_title[k].extend(v)
# Create inverted index instance
inverted_title = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_title.posting_locs = super_posting_locs_title

# Add the token - df dictionary to the inverted index
inverted_title.df = w2df_dict_title

#add document_len
inverted_title.document_len = document_len_title

# dictionary => {doc_id:title of the document}
inverted_title.doc_id_title = doc_title_pairs_dict

# for each document , calculate the norma of all tfidf values in this document
# step 1 :  calculate tfidf for each term in first document:
#            tf = frequency of term i in doc j divided by length of doc j
#            idf = log base 10 of (number of docs in corpus divided by number of documents that term i appears in )
# step 2 : for first document , compute norma using all tfidf value
# step 3 : repeat for each docmunet
#step 4 : create a dictionary with : key = doc_id and value = norma
inverted_title.doc_id_to_norm = doc_id_to_norm_title

inverted_title.write_index('.', 'title_index')
# upload to gs
index_src = "title_index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

calculate the following for the anchor in order to create anchor index

In [None]:
w2df_dict_anchor = w2df_anchor.collectAsMap()

print('complete 1')

document_len_anchor= doc_anchor_pairs.map(lambda tup: calc_document_len(tup[1],tup[0],False)).collectAsMap()

print('complete 2')

posting_locs_list_anchor = partition_postings_and_write(postings_anchor,"anchor_index").collect()

print('complete 3')

anchor_token_counter_by_doc_id = doc_anchor_pairs.map(lambda pair: (pair[0], doc_to_term_counter(pair[1],True)))

print('complete 4')
anchor_token_counter_by_doc_id = anchor_token_counter_by_doc_id.filter(lambda x: document_len_anchor[x[0]] != 0) # keep only document in len > 0

doc_id_to_norm_anchor = anchor_token_counter_by_doc_id.map(
    lambda docid_counter: (docid_counter[0], np.linalg.norm([(docid_counter[1][term] / document_len_anchor.get(docid_counter[0],1)) * math.log(len(document_len_anchor) / w2df_dict_anchor.get(term, len(document_len_anchor)), 10) for term in docid_counter[1]]))).collectAsMap()


Create the anchor index and write it to our bucket in GCP

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs_anchor = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/anchor_index'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_anchor[k].extend(v)
# Create inverted index instance
inverted_anchor = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_anchor.posting_locs = super_posting_locs_anchor

# Add the token - df dictionary to the inverted index
inverted_anchor.df = w2df_dict_anchor

#add document_len
inverted_anchor.document_len = document_len_anchor

# dictionary => {doc_id:title of the document}
inverted_anchor.doc_id_title = doc_title_pairs_dict

# for each document , calculate the norma of all tfidf values in this document
# step 1 :  calculate tfidf for each term in first document:
#            tf = frequency of term i in doc j divided by length of doc j
#            idf = log base 10 of (number of docs in corpus divided by number of documents that term i appears in )
# step 2 : for first document , compute norma using all tfidf value
# step 3 : repeat for each docmunet
#step 4 : create a dictionary with : key = doc_id and value = norma

inverted_anchor.doc_id_to_norm = doc_id_to_norm_anchor

inverted_anchor.write_index('.', 'anchor_index')
# upload to gs
index_src = "anchor_index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

calculate the following for the text in order to create text index

In [None]:
w2df_dict_text = w2df_text.collectAsMap()

print('complete 1')

document_len_text= doc_text_pairs.map(lambda tup: calc_document_len(tup[1],tup[0],True)).collectAsMap()

print('complete 2')

# there are 124 buckets in each index
posting_locs_list_text = partition_postings_and_write(postings_filtered_text,"text_index").collect()

print('complete 3')

text_token_counter_by_doc_id = doc_text_pairs.map(lambda pair: (pair[0], doc_to_term_counter(pair[1],True)))

In [None]:
# for each document , calculate the norma of all tfidf values in this document
# step 1 :  calculate tfidf for each term in first document:
#            tf = frequency of term i in doc j divided by length of doc j
#            idf = log base 10 of (number of docs in corpus divided by number of documents that term i appears in )
# step 2 : for first document , compute norma using all tfidf value
# step 3 : repeat for each docmunet
#step 4 : create a dictionary with : key = doc_id and value = norma
doc_id_to_norm_text = text_token_counter_by_doc_id.map(lambda docid_counter: (docid_counter[0], np.linalg.norm([(docid_counter[1][term] / document_len_text[docid_counter[0]]) * math.log(len(document_len_text) / w2df_dict_text.get(term, len(document_len_text)), 10) for term in docid_counter[1]]))).collectAsMap()

Create the text index and write it to our bucket in GCP

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs_text = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/text_index'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_text[k].extend(v)

# Create inverted index instance
inverted_text = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_text.posting_locs = super_posting_locs_text

# Add the token - df dictionary to the inverted index
inverted_text.df = w2df_dict_text

#add document_len
inverted_text.document_len = document_len_text

# dictionary => {doc_id:title of the documant}
inverted_text.doc_id_title = doc_title_pairs_dict

inverted_text.doc_id_to_norm = doc_id_to_norm_text

inverted_text.write_index('.', 'text_index')
# upload to gs
index_src = "text_index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
!gsutil ls -lh $index_dst