***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!

In [1]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-db4c  GCE       3                                             RUNNING  us-central1-a


# Imports & Setup

In [3]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [4]:
pip install nltk

[0mNote: you may need to restart the kernel to use updated packages.


In [5]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
from operator import add
from nltk.stem import PorterStemmer
import builtins

import math

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [6]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar 11 14:01 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [7]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [8]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'nettaya-315443382' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name.endswith('.parquet'):
        paths.append(full_path+b.name)

In [9]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [10]:
from inverted_index_gcp import *

In [11]:
parquetFile = spark.read.parquet(*paths)
doc_body_pairs = parquetFile.select("id", "text").rdd
inverted_body = InvertedIndex()

                                                                                

In [12]:
porter = PorterStemmer()

In [13]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)


def word_count(id, text):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
    '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  tokens = [x for x in tokens if x not in all_stopwords]
  tokens = [porter.stem(x) for x in tokens]
  tuples = []
  tf_dict = Counter(tokens)
  res = []
  [res.append(x) for x in tokens if x not in res]
  for t in res:
    tuples.append((t,(id,tf_dict[t])))
  return tuples

def count_doc_len(doc_id, text):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  tokens = [x for x in tokens if x not in all_stopwords]
  tokens = [porter.stem(x) for x in tokens]
  return (doc_id,len(tokens))

def tf_for_term_id(doc_id, text):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  tokens = [x for x in tokens if x not in all_stopwords]
  tuples = []
  tokens = [porter.stem(x) for x in tokens]
  tf_dict = Counter(tokens)
  res = []
  [res.append(x) for x in tokens if x not in res]
  for t in res:
    tuples.append((t,tf_dict[t]))
  return tuples

def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
  return sorted_pl

def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  token_df = postings.map(lambda x: (x[0], len(x[1])))
  return token_df



In [None]:
# word_counts_body = doc_body_pairs.flatMap(lambda x: word_count(x[0], x[1]))
# postings_body = word_counts_body.groupByKey().mapValues(reduce_word_counts)
# postings_filtered_body = postings_body.filter(lambda x: len(x[1])>50)

In [None]:
len_docs_body = doc_body_pairs.map(lambda x: count_doc_len(x[0], x[1]))
len_docs_body = len_docs_body.collectAsMap()
inverted_body.DL = len_docs_body

In [None]:
# write the global stats out
inverted_body.write_index('.', 'index_body_DL')
# !ls -l index_body.pkl
# upload to gs
index_src = "index_body_DL.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp_body_DL/{index_src}'
!gsutil cp index_body_DL.pkl gs://nettadl-315443382/postings_gcp_body_DL/index_body_DL.pkl

In [None]:
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def partition_postings_and_write(postings,base_dir):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning should be done through the use of `token2bucket`
  above. Writing to disk should use the function  `write_a_posting_list`, a
  static method implemented in inverted_index_colab.py under the InvertedIndex
  class.
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
  '''
  bucket_name = "nettaya-315443382"

 # Assume postings is an RDD or a similar collection where each item is a (word, posting list) pair
 # First, transform postings into (bucket_id, (word, posting list)) for grouping using your `token2bucket_id` logic
  postings_by_bucket = postings.map(lambda x: (token2bucket_id(x[0]), x))

  # Group postings by bucket_id and prepare them for writing
  postings_grouped = postings_by_bucket.groupByKey().mapValues(list)

  # For each bucket, write postings to disk and collect location info
  postings_locations = postings_grouped.map(lambda x: InvertedIndex.write_a_posting_list(x, base_dir, bucket_name))

  return postings_locations

In [None]:
postings_locs_body = partition_postings_and_write(postings_filtered_body,'text').collect()

In [14]:
# collect all posting lists locations into one super-set
super_posting_locs_body = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='text'):
    if not blob.name.endswith(".pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs_body[k].extend(v)

In [15]:
inverted_body.posting_locs = super_posting_locs_body

In [16]:
# write the global stats out
inverted_body.write_index('.', 'index_body_posting_locs')
# !ls -l index_body.pkl
# upload to gs
index_src = "index_body_posting_locs.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp_body_posting_locs/{index_src}'
!gsutil cp index_body_posting_locs.pkl gs://nettaya-315443382/postings_gcp_body_posting_locs/index_body_posting_locs.pkl

Copying file://index_body_posting_locs.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 10.6 MiB/ 10.6 MiB]                                                
Operation completed over 1 objects/10.6 MiB.                                     


In [None]:
# Add the token - df dictionary to the inverted index
# inverted_body.df = w2df_body_dict
# Adding the posting locations dictionary to the inverted index

In [None]:
# def calculate_tfidf_doc(tf_term_doc, index):
#     DL = index.DL
#     N = len(DL)
#     df_dict = index.df
#     doc_term_tfidf = tf_term_doc.map(lambda x : (x[0] , math.sqrt(builtins.sum([math.pow((tf/DL[x[0]]) * (math.log(N/df_dict[term],10)),2) for term, tf in x[1] if term in df_dict and x[0] in index.DL]))))
#     return doc_term_tfidf

In [None]:
# tf_term_doc_body = doc_text_pairs.map(lambda x : (x[0], tf_for_term_id(x[0], x[1])))
# tfidf_body = calculate_tfidf_doc(tf_term_doc_body, inverted_body)
# tfidf_dict_body = tfidf_body.collectAsMap()

In [None]:
# total_terms_body = postings_filtered_body.flatMapValues(lambda x : x).map(lambda x: (x[0],x[1][1])).reduceByKey(add)
# inverted_body.term_total = total_terms_body.collectAsMap()

In [None]:
# w2df_body = calculate_df(postings_filtered_body)
# w2df_body_dict = w2df_body.collectAsMap()

In [None]:
# write_pickle(inverted_body.DL, "body_DL",bucket_name,"body" )

In [None]:
# write_pickle(inverted_body.term_total, "body_term_total",bucket_name,"body")

In [None]:
# write_pickle(inverted_body.vec_len_doc, "vec_len_total",bucket_name,"body")