***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!

In [1]:
# if the following command generates an error, you probably didn't enable
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME     PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
ir-proj  GCE       2                                             RUNNING  us-central1-a


# Imports & Setup

In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [29]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import threading
from concurrent.futures import ThreadPoolExecutor

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar 10 17:19 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [6]:
spark

In [7]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = '318964772'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh' and b.name.startswith("multi"):
        paths.append(full_path+b.name)

***GCP setup is complete!*** If you got here without any errors you've earned 10 out of the 35 points of this part.

# Create Page Views

In [8]:
# Paths
# Using user page views (as opposed to spiders and automated traffic) for the
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path)
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB)
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly
# total number of page views (5). Then, remove lines with article id or page
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})
# write out the counter as binary file (pickle it)
with open(pv_clean, 'wb') as f:
  pickle.dump(wid2pv, f)
# read in the counter
with open(pv_clean, 'rb') as f:
  wid2pv = pickle.loads(f.read())

--2024-03-09 17:25:32--  https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2
Resolving dumps.wikimedia.org (dumps.wikimedia.org)... 208.80.154.71, 2620:0:861:3:208:80:154:71
Connecting to dumps.wikimedia.org (dumps.wikimedia.org)|208.80.154.71|:443... connected.
HTTP request sent, awaiting response... 304 Not Modified
File ‘pageviews-202108-user.bz2’ not modified on server. Omitting download.



In [10]:
# with open(pv_clean, 'rb') as f:
#   wid2pv = pickle.loads(f.read())

In [9]:
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('page_views.pkl')
blob.upload_from_string(pickle.dumps(wid2pv))

In [8]:
bucket_name = '318964772'
file_path = 'page_views.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
page_views = pickle.loads(contents)

# Building an inverted index

Here, we read the entire corpus to an rdd, directly from Google Storage Bucket and use your code from Colab to construct an inverted index.

In [9]:
parquetFile = spark.read.parquet(*paths)
doc_title_pairs = parquetFile.select("title", "id").rdd

                                                                                

We will count the number of pages to make sure we are looking at the entire corpus. The number of pages should be more than 6M

In [10]:
# Count number of wiki pages
parquetFile.count()

                                                                                

6348910

Let's import the inverted index module. Note that you need to use the staff-provided version called `inverted_index_gcp.py`, which contains helper functions to writing and reading the posting files similar to the Colab version, but with writing done to a Google Cloud Storage bucket.

In [11]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [12]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [13]:
from inverted_index_gcp import InvertedIndex

In [13]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE
def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  term_count  = {}
  for token in tokens:
    if token not in all_stopwords:
      if token not in term_count:
        term_count[token]= 1
      else:
        term_count[token] +=1
  result = list(map(lambda x: (x, (id, term_count[x])), term_count.keys()))
  return result

def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  tf_dict = {}

  # Iterate over the tf values and aggregate them for each wiki_id
  for tup in unsorted_pl:
      if tup[0] in tf_dict:
          tf_dict[tup[0]] += tup[1]
      else:
          tf_dict[tup[0]] = tup[1]

  # Convert the dictionary to a list of tuples
  sorted_pl = sorted(tf_dict.items(), key=lambda x: x[0])
  return sorted_pl
def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # YOUR CODE HERE
  df_pairs = postings.map(lambda x: (x[0], len(x[1])))
  return df_pairs
def partition_postings_and_write(base_dir,postings):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning should be done through the use of `token2bucket`
  above. Writing to disk should use the function  `write_a_posting_list`, a
  static method implemented in inverted_index_colab.py under the InvertedIndex
  class.
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
  '''
  # YOUR CODE HERE
  posting_locations = postings.map(lambda x: (token2bucket_id(x[0]),[(x[0],x[1])]))
  posting_locations = posting_locations.reduceByKey(lambda p1,p2 : p1+p2)
  posting_locations = posting_locations.map(lambda x: InvertedIndex.write_a_posting_list(x,base_dir,bucket_name='318964772'))
  return posting_locations

# Build body inverted index no stemming

In [None]:
# time the index creation time
t_start = time()
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered).collect()
index_const_time = time() - t_start

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
# Create inverted index instance
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'index')
# upload to gs
index_src = "index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
!gsutil ls -lh $index_dst

# Build title inverted index no stemming

In [90]:
# time the index creation time
t_start = time()
# word counts map
word_counts = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
# postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(base_dir ='title_InvertedIndex' ,postings =postings).collect()
index_const_time = time() - t_start

                                                                                

In [16]:
# test index construction time
assert index_const_time < 60*120


In [91]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='title_InvertedIndex'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

Putting it all together

In [92]:
# Create inverted index instance
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'title_InvertedIndex')
# upload to gs
index_src = "title_InvertedIndex.pkl"
index_dst = f'gs://{bucket_name}/title_InvertedIndex/{index_src}'
!gsutil cp $index_src $index_dst

Copying file://title_InvertedIndex.pkl [Content-Type=application/octet-stream]...
- [1 files][ 67.6 MiB/ 67.6 MiB]                                                
Operation completed over 1 objects/67.6 MiB.                                     


In [93]:
!gsutil ls -lh $index_dst

 67.59 MiB  2024-03-04T21:33:56Z  gs://318964772/title_InvertedIndex/title_InvertedIndex.pkl
TOTAL: 1 objects, 70874019 bytes (67.59 MiB)


# Build Inverted Index with stemming

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
stemmer = PorterStemmer()
NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE
def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [stemmer.stem(token.group().lower()) for token in RE_WORD.finditer(text.lower())]
  term_count  = {}
  for token in tokens:
    if token not in all_stopwords:
      if token not in term_count:
        term_count[token]= 1
      else:
        term_count[token] +=1
  result = list(map(lambda x: (x, (id, term_count[x])), term_count.keys()))
  return result

def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  tf_dict = {}

  # Iterate over the tf values and aggregate them for each wiki_id
  for tup in unsorted_pl:
      if tup[0] in tf_dict:
          tf_dict[tup[0]] += tup[1]
      else:
          tf_dict[tup[0]] = tup[1]

  # Convert the dictionary to a list of tuples
  sorted_pl = sorted(tf_dict.items(), key=lambda x: x[0])
  return sorted_pl
def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # YOUR CODE HERE
  df_pairs = postings.map(lambda x: (x[0], len(x[1])))
  return df_pairs
def partition_postings_and_write(base_dir,postings):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning should be done through the use of `token2bucket`
  above. Writing to disk should use the function  `write_a_posting_list`, a
  static method implemented in inverted_index_colab.py under the InvertedIndex
  class.
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
  '''
  # YOUR CODE HERE
  posting_locations = postings.map(lambda x: (token2bucket_id(x[0]),[(x[0],x[1])]))
  posting_locations = posting_locations.reduceByKey(lambda p1,p2 : p1+p2)
  posting_locations = posting_locations.map(lambda x: InvertedIndex.write_a_posting_list(x,base_dir,bucket_name='318964772'))
  return posting_locations

24/03/06 09:12:20 WARN TaskSetManager: Lost task 2.2 in stage 12.0 (TID 695) (ir-project-yz-w-0.c.ass3-ir-413709.internal executor 5): TaskKilled (Stage cancelled)
24/03/06 09:12:20 WARN TaskSetManager: Lost task 9.1 in stage 12.0 (TID 696) (ir-project-yz-w-1.c.ass3-ir-413709.internal executor 12): TaskKilled (Stage cancelled)
24/03/06 09:12:21 WARN TaskSetManager: Lost task 10.1 in stage 12.0 (TID 698) (ir-project-yz-w-1.c.ass3-ir-413709.internal executor 11): TaskKilled (Stage cancelled)


# Build body inverted index with stemming

In [19]:
# take the 'text' and 'id' or the first 1000 rows and create an RDD from it
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd

                                                                                

In [19]:
# time the index creation time
t_start = time()
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(base_dir ='body_inverted_index_stem', postings= postings_filtered).collect()
index_const_time = time() - t_start

In [16]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='body_inverted_index_stem'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [20]:
# Create inverted index instance
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'index')
# upload to gs
index_src = "index.pkl"
index_dst = f'gs://{bucket_name}/body_inverted_index_stem/{index_src}'
!gsutil cp $index_src $index_dst

# PageRank

In [None]:
# Put your `generate_graph` function here
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
  '''
  # YOUR CODE HERE
  edges = pages.flatMap(lambda page: [(page[0], anchor['id']) for anchor in page[1]])
  vertices = pages.flatMap(lambda page: [(page[0],)] + [(anchor['id'],) for anchor in page[1]])
  # Remove duplicates
  edges = edges.distinct()
  vertices = vertices.distinct()

  return edges, vertices

In [52]:
t_start = time()
pages_links = parquetFile.select ("id","anchor_text").rdd
# construct the graph
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr_time = time() - t_start
pr.show()

# Calculations 

In [13]:
parquetFile = spark.read.parquet(*paths)
doc_title_pairs = parquetFile.select("title", "id").rdd

                                                                                

In [14]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ['category', 'references', 'also', 'links', 'extenal', 'see', 'thumb']
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
all_stopwords = english_stopwords.union(corpus_stopwords)

# Calculations for body index

# Extract pickle files from my bucket

In [47]:
bucket_name = '318964772'
file_path = 'postings_gcp/index.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
body_index = pickle.loads(contents)

In [15]:
file_path = 'calculations/docid_title_pairs.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
doc_title_pairs = pickle.loads(contents)

In [16]:
file_path = 'calculations/docid_body_length.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
doc_lengths = pickle.loads(contents)

In [18]:
file_path = 'calculations/word_idf_dict.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
idf = pickle.loads(contents)

In [19]:
file_path = 'calculations/doc_nf.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
normalization_factor_per_doc = pickle.loads(contents)

In [35]:
# normalization_factor_per_doc

# Create doc title pairs ({doc_id:title})

In [29]:
import math
###############dont run again, already in the bucket
doc_title_pairs = parquetFile.select("id", "title").rdd
doc_title_pairs = doc_title_pairs.collectAsMap()

# Upload doc title pairs to my bucket

In [100]:
###############dont run again, already in the bucket
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('docid_title_pairs.pkl')
blob.upload_from_string(pickle.dumps(doc_title_pairs))

# Calculate the document length - {docid:length}

In [27]:
# bucket = client.get_bucket(bucket_name)
# blob = bucket.blob('docid_text_pairs.pkl')
# blob.upload_from_string(pickle.dumps(doc_text_pairs))

In [109]:
###############dont run again, already in the bucket
doc_lengths = doc_text_pairs.map(lambda x: (x[1], len(tokenize(x[0]))))
doc_lengths = doc_lengths.collectAsMap()

                                                                                

# Insert the length to the bucket

In [112]:
###############dont run again, already in the bucket
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('docid_body_length.pkl')
blob.upload_from_string(pickle.dumps(doc_lengths))

# Calculate the whole corpus length

In [27]:
###############dont run again, already in the bucket
corpus_length = doc_text_pairs.count()

                                                                                

In [34]:
###############dont run again, already in the bucket
import math
idf = {}
for word in body_index.df.keys(): 
                                                              # base 10
    idf[word] = math.log((corpus_length/body_index.df[word]), 10)

In [17]:
filter_func = lambda tok: tok not in all_stopwords
tokenize = lambda text: [token.group() for token in RE_WORD.finditer(text.lower()) if token not in all_stopwords]

In [37]:
###############dont run again, already in the bucket
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('word_idf_dict.pkl')
blob.upload_from_string(pickle.dumps(idf))

In [38]:
###############dont run again, already in the bucket
# rdd in the format of (docid, Counter[word1: word1freq, word2:word2freq...])
tf_idf_per_doc = doc_text_pairs.map(lambda x: (x[1], Counter(tokenize(x[0]))))
# rdd in the format of (docid, [(word1, word_1_tf_idf),(word2,word_2_tf_idf)...])
tf_idf_per_doc = tf_idf_per_doc.map(lambda x: (x[0], [(word, ((freq*idf[word])/doc_lengths[x[0]])) for word, freq in x[1].items() if word in idf.keys()]))

In [43]:
###############dont run again, already in the bucket
def sum_rdd_values(d):
    total = 0
    for _, value in d:
        total += (value)**2
    return total
normalization_factor_per_doc = tf_idf_per_doc.map(lambda x: (x[0], sum_rdd_values(x[1]))).collectAsMap()


                                                                                

In [44]:
###############dont run again, already in the bucket
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('doc_nf.pkl')
blob.upload_from_string(pickle.dumps(normalization_factor_per_doc))


# Calculations for body inverted index with stemming

In [18]:
bucket_name = '318964772'
file_path = 'body_index_with_stem/body_index_with_stem.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
body_index_stem = pickle.loads(contents)

In [19]:
bucket_name = '318964772'
file_path = 'docid_body_length_stem.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
doc_lengths_stem = pickle.loads(contents)

In [20]:
bucket_name = '318964772'
file_path = 'word_idf_dict_stem.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
idf_stem = pickle.loads(contents)

In [21]:
import math
###############dont run again, already in the bucket
parquetFile = spark.read.parquet(*paths)
doc_title_pairs_stem = parquetFile.select("title", "id").rdd
doc_title_pairs_stem = doc_title_pairs_stem.collectAsMap()

                                                                                

In [22]:
###############dont run again, already in the bucket
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('docid_title_pairs_stem.pkl')
blob.upload_from_string(pickle.dumps(doc_title_pairs_stem))

In [24]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs_stem = parquetFile.select("text", "id").rdd

                                                                                

In [26]:
###############dont run again, already in the bucket
doc_lengths_stem = doc_text_pairs_stem.map(lambda x: (x[1], len(tokenize(x[0]))))
doc_lengths_stem = doc_lengths_stem.collectAsMap()

                                                                                

In [27]:
###############dont run again, already in the bucket
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('docid_body_length_stem.pkl')
blob.upload_from_string(pickle.dumps(doc_lengths_stem))

In [25]:
###############dont run again, already in the bucket
corpus_length_stem = doc_text_pairs_stem.count()

                                                                                

In [23]:
filter_func = lambda tok: tok not in all_stopwords
tokenize = lambda text: [token.group() for token in RE_WORD.finditer(text.lower()) if token not in all_stopwords]

In [27]:
###############dont run again, already in the bucket
import math
idf_stem = {}
for word in body_index_stem.df.keys(): 
                                                              # base 10
    idf_stem[word] = math.log((corpus_length_stem/body_index_stem.df[word]), 10)

In [28]:
###############dont run again, already in the bucket
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('word_idf_dict_stem.pkl')
blob.upload_from_string(pickle.dumps(idf_stem))

In [40]:
###############dont run again, already in the bucket
# rdd in the format of (docid, Counter[word1: word1freq, word2:word2freq...])
stemmer = PorterStemmer()
tf_idf_per_doc_stem = doc_text_pairs_stem.map(lambda x: (x[1], Counter([stemmer.stem(token)for token in tokenize(x[0])])))
# rdd in the format of (docid, [(word1, word_1_tf_idf),(word2,word_2_tf_idf)...])
tf_idf_per_doc_stem = tf_idf_per_doc_stem.map(lambda x: (x[0], [(word, ((freq*idf_stem[word])/doc_lengths_stem[x[0]])) for word, freq in x[1].items() if word in idf_stem.keys()]))

In [53]:
###############dont run again, already in the bucket
def sum_rdd_values(d):
    total = 0
    for _, value in d:
        total += (value)**2
    return total
normalization_factor_per_doc_stem = tf_idf_per_doc_stem.map(lambda x: (x[0], sum_rdd_values(x[1]))).collectAsMap()

In [33]:
# normalization_factor_per_doc_stem

In [None]:
###############dont run again, already in the bucket
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('doc_nf_stem.pkl')
blob.upload_from_string(pickle.dumps(normalization_factor_per_doc_stem))


 # Calculations for title index

# Extract pickle files from our bucket

In [22]:
bucket_name = '318964772'
file_path = 'title_InvertedIndex/title_InvertedIndex.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
title_index = pickle.loads(contents)



In [23]:

file_path = 'calculations/docid_title_pairs.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
doc_title_pairs = pickle.loads(contents)


In [24]:
file_path = 'calculations_title_index/title_idf_dict.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
title_idf = pickle.loads(contents)


In [21]:
file_path = 'calculations_title_index/docid_title_length.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
title_lengths = pickle.loads(contents)

In [26]:
file_path = 'calculations_title_index/title_nf.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
normalization_factor_per_title = pickle.loads(contents)

In [None]:
# title_lengths = doc_title_pairs.map(lambda x: (x[1], len(tokenize(x[0]))))
# title_lengths = title_lengths.collectAsMap()


In [27]:
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('docid_title_length.pkl')
blob.upload_from_string(pickle.dumps(title_lengths))

In [19]:
corpus_length = doc_title_pairs.count()

                                                                                

In [25]:
import math
title_idf = {}
for word in title_index.df.keys(): 
                                                              # base 10
    title_idf[word] = math.log((6348910/title_index.df[word]), 10)

In [26]:
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('title_idf_dict.pkl')
blob.upload_from_string(pickle.dumps(title_idf))


In [34]:
tf_idf_per_title = doc_title_pairs.map(lambda x: (x[1], Counter(tokenize(x[0]))))
tf_idf_per_title.collect()

# rdd in the format of (docid, [(word1, word_1_tf_idf)....])
tf_idf_per_title = tf_idf_per_title.map(lambda x: (x[0], [(word, ((freq*title_idf[word])/title_lengths[x[0]])) for word, freq in x[1].items() if word in title_idf.keys()]))

                                                                                

In [37]:
def sum_rdd_values(d):
    total = 0
    for _, value in d:
        total += (value)**2
    return total
normalization_factor_per_title = tf_idf_per_title.map(lambda x: (x[0], sum_rdd_values(x[1]))).collectAsMap()

                                                                                

In [38]:
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('title_nf.pkl')
blob.upload_from_string(pickle.dumps(normalization_factor_per_title))


# Calculations for title index with stemming

In [22]:
bucket_name = '318964772'
file_path = 'title_index/title_index.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
title_index_stem = pickle.loads(contents)

In [23]:
bucket_name = '318964772'
file_path = 'title_idf_dict_stem.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
title_idf_stem = pickle.loads(contents)

In [24]:
stemmer = PorterStemmer()
title_idf_stem[stemmer.stem('telecommunications')]

4.0245479203227115

# Import page rank from the bucket as calculate in ass3

In [13]:
import gzip
import io
import csv
bucket_name = '318964772'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
file_path = 'pr/part-00000-3a7c6cc8-cc26-426b-9412-b9e1dfdccd59-c000.csv.gz'
blob = bucket.blob(file_path)
blob_data = blob.download_as_string()
# Decompress the gzip data
decompressed_data = gzip.decompress(blob_data)
# Decode the decompressed data as text
decompressed_text = decompressed_data.decode('utf-8')
# Create a StringIO object to treat the text as a file-like object
csv_data = io.StringIO(decompressed_text)
# Read the CSV data
csv_reader = csv.reader(csv_data)
page_rank_rdd = sc.parallelize(list(csv_reader))
page_rank_dict = {int(row[0]): float(row[1]) for row in page_rank_rdd.collect()}
# Convert RDD to dictionary (map)
# page_rank_dict = page_rank_rdd.collectAsMap()
    

24/03/05 11:35:07 WARN TaskSetManager: Stage 5 contains a task of very large size (55784 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [14]:
bucket = client.get_bucket(bucket_name)
blob = bucket.blob('page_rank.pkl')
blob.upload_from_string(pickle.dumps(page_rank_dict))


In [24]:
bucket_name = '318964772'
file_path = 'page_rank.pkl'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
page_rank_dict = pickle.loads(contents)


In [25]:
print(page_rank_dict[30635])

98.77823111423265
