# Imports & Setup

In [1]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [2]:
#import everything
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import math

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [3]:
# more imports
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [4]:
# get all parquets from bucket
bucket_name = 'ass3_new' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh':
        paths.append(full_path+b.name)

# Building an inverted index

Here, we read the entire corpus to an rdd, directly from Google Storage Bucket and use your code from Colab to construct an inverted index.

In [5]:
# get only parquet files
parquet_paths = [path for path in paths if path.endswith('.parquet')]
parquetFile = spark.read.parquet(*parquet_paths)
# get id and title of each article
doc_text_pairs = parquetFile.select("id", "title").rdd

                                                                                

Let's import the inverted index module. Note that you need to use the staff-provided version called `inverted_index_gcp.py`, which contains helper functions to writing and reading the posting files similar to the Colab version, but with writing done to a Google Cloud Storage bucket.

In [6]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [7]:
from inverted_index_gcp import InvertedIndex

In [8]:
# functions to be used (from assignment 3)
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def word_count(id, text):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  # get tokens and regex them
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  # stem the tokens
  stemmer = PorterStemmer()
  tokens = [stemmer.stem(t) for t in tokens if t not in all_stopwords]
  # create empty dictionary for mapping
  word_freq = {}
  # iterate over tokens
  for token in tokens:
      if token not in all_stopwords: # if token is not in stopwords
        if token in word_freq: # if token is in dict, increment count
            word_freq[token] += 1
        else:
            word_freq[token] = 1 # if token is not in dict, add it
  # convert dictionary to list of tuples, with term and (doc_id, tf)
  return [(word, (id, freq)) for word, freq in word_freq.items()]

def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  # sort the list by wiki_id and return it
  return sorted(unsorted_pl, key=lambda x: x[0])

def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # calculate df for each token and return result
  return postings.map(lambda x: (x[0], len(x[1])))

import pickle
from google.cloud import storage
from pathlib import Path

def write_to_disk(obj, base_dir, name, bucket_name=None):
    '''
    Function to write object to disk.
    
    obj: object being written to disk
    base_dir: base directory
    name: name of pkl file
    bucket_name: the name of the bucket that object is being written to
    '''
    # If writing to a bucket, create the directory if it doesn't exist
    if bucket_name:
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.blob(f"{base_dir}/{name}.pkl")
        blob.upload_from_string(pickle.dumps(obj))
    else:
        path = str(Path(base_dir) / f'{name}.pkl')
        with open(path, 'wb') as f:
            pickle.dump(obj, f)
            
def load_from_bucket(bucket_name, file_path):
    """
    Load a file from a Google Cloud Storage bucket.
    
    Args:
    - bucket_name: Name of the Google Cloud Storage bucket.
    - file_path: Path to the file within the bucket.
    
    Returns:
    - The contents of the file.
    """
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(file_path)
    contents = blob.download_as_string()
    return pickle.loads(contents)

In [9]:
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# # filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)

In [2]:
# # ALREADY WROTE TO GCP BUCKET
#w2df = calculate_df(postings_filtered)
#w2df_dict = w2df.collectAsMap()
#len(w2df_dict.keys())
# write df to disk
#write_to_disk(w2df_dict, "title_index", "df", bucket_name)
# Function to calculate doc length
# def doc_len(doc_id, text):
#     words = text.split()  # split text into words
#     return [(doc_id, len(words))]  # return a tuple of (doc_id, word_count)

# # applying word_count function and flattening the result
# docs_len = doc_text_pairs.flatMap(lambda x: doc_len(x[0], x[1]))

# # collecting the RDD into a dictionary
# docs_len_dict = docs_len.collectAsMap()
# write doc_len to disk
# write_to_disk(docs_len_dict, "title_index", "docs_len", bucket_name)

In [10]:
# load df and docs_len from bucket
file_path = "title_index/df.pkl"
# Load the pickle file
w2df_dict = load_from_bucket(bucket_name, file_path)
file_path = "title_index/docs_len.pkl"
docs_len_dict = load_from_bucket(bucket_name, file_path)

In [11]:
# WROTE TO BUCKET
# get number of documents
N = doc_text_pairs.map(lambda x: x[0]).distinct().count()
# calculate idf
idf = sc.broadcast({term: math.log(N / doc_freq) for term, doc_freq in w2df_dict.items()})

# # Calculate TF-IDF for each term-document pair
tf_idf = postings_filtered.flatMap(lambda x: [(x[0], (doc_id, (tf / docs_len_dict[doc_id]) * idf.value[x[0]])) for doc_id, tf in x[1]])
# Convert the TF-IDF RDD to a DataFrame
# tf_idf_df = tf_idf.map(lambda x: (x[0], x[1][0], x[1][1])).toDF(["term", "doc_id", "tf_idf"])
# Save the DataFrame as a CSV file
# tf_idf_df.write.csv("gs://ass3_new/title_index/tf_idf_normalized.csv") 

                                                                                

In [27]:
# Read the CSV file into a DataFrame
csv_df = spark.read.csv("gs://ass3_new/title_index/tf_idf_normalized.csv", header=True, inferSchema=True)

def tfidfByTerm(csv_df, terms):
    ''' This function returns the tf-idf of the term in the index.
    It takes in the csv_tfidf and the term and returns a dictionary of the tf-idf
    '''
    # filter by terms
    filtered_df = csv_df.filter(col(csv_df.columns[0]).isin(terms))
    # collect to dictionary
    # Collect the filtered DataFrame into the driver program
    filtered_rows = filtered_df.collect()

    # Create a dictionary from the collected rows
    result_dict = {}
    for row in filtered_rows:
        key = row[0]  # Value from col0
        value = (row[1], row[2])  # Tuple of values from col1 and col2
        if key not in result_dict:
            result_dict[key] = []
        result_dict[key].append(value)
    # return dict
    return result_dict

# list of tokens
lst = ["stay", "hello"]
dictfilter = tfidfByTerm(csv_df, lst)

                                                                                

In [12]:
# calculate root_len_sqr vector of each doc

# The result will be (doc_id, tf^2) tuples
doc_tf_pairs = postings_filtered.flatMap(lambda term_postings: [(doc_id, ((tf)**2)) for doc_id, tf in term_postings[1]])
# group by key and raise tf in square
doc_sum_tf = doc_tf_pairs.reduceByKey(lambda tf1, tf2: tf1 + tf2)
# get square root
doc_sqrt_sum_tf = doc_sum_tf.mapValues(lambda sum_tf: math.sqrt(sum_tf))

In [17]:
# Convert the TF-IDF RDD to a DataFrame
doc_sqrt_sum_tf_df = doc_sqrt_sum_tf.map(lambda x: (x[0], x[1])).toDF(["doc_id", "sqr_root"])
# Save the DataFrame as a CSV file
doc_sqrt_sum_tf_df.write.csv("gs://ass3_new/title_index/doc_sqrt_sum_tf.csv") 

                                                                                

In [19]:
# Create inverted index instance
inverted = InvertedIndex()
# Add the componenets to index
inverted.df = w2df_dict
inverted.tf_idf = tf_idf_dict
inverted.docs_len = docs_len_dict

# PageRank

**YOUR TASK (10 POINTS):** Compute PageRank for the entire English Wikipedia. Use your implementation for `generate_graph` function from Colab below.

In [19]:
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
  '''
  # extract source page id and destination page ids from each row
  edges = pages.flatMap(lambda row: [(row.id, link.id) for link in row.anchor_text])
  # remove duplicates from edges RDD
  edges = edges.distinct()
  # extract unique vertices from both source and destination ids
  vertices_source = edges.map(lambda edge: edge[0])
  vertices_dest = edges.map(lambda edge: edge[1])
  # combine source and destination vertices and remove duplicates
  vertices = (vertices_source.union(vertices_dest).distinct()).map(lambda x: (x,))
  return edges, vertices

In [23]:
t_start = time()
pages_links = parquetFile.select ("id","anchor_text").rdd
#pages_links = spark.read.parquet("gs://wikidata_preprocessed/*").select("id", "anchor_text").rdd
# construct the graph 
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr_time = time() - t_start
pr.show()

24/02/14 10:00:14 WARN YarnAllocator: Container from a bad node: container_1707900552804_0001_01_000017 on host: cluster-ass3-new-w-2.c.assignment3-413720.internal. Exit status: 143. Diagnostics: [2024-02-14 10:00:14.838]Container killed on request. Exit code is 143
[2024-02-14 10:00:14.839]Container exited with a non-zero exit code 143. 
[2024-02-14 10:00:14.839]Killed by external signal
.
24/02/14 10:00:14 ERROR YarnScheduler: Lost executor 15 on cluster-ass3-new-w-2.c.assignment3-413720.internal: Container from a bad node: container_1707900552804_0001_01_000017 on host: cluster-ass3-new-w-2.c.assignment3-413720.internal. Exit status: 143. Diagnostics: [2024-02-14 10:00:14.838]Container killed on request. Exit code is 143
[2024-02-14 10:00:14.839]Container exited with a non-zero exit code 143. 
[2024-02-14 10:00:14.839]Killed by external signal
.
24/02/14 10:00:14 WARN TaskSetManager: Lost task 30.0 in stage 12.0 (TID 729) (cluster-ass3-new-w-2.c.assignment3-413720.internal executor 

+-------+------------------+
|     id|          pagerank|
+-------+------------------+
|3434750| 9913.728782160777|
|  10568| 5385.349263642035|
|  32927|5282.0815757652745|
|  30680|  5128.23370960412|
|5843419| 4957.567686263868|
|  68253|4769.2782653551585|
|  31717| 4486.350180548308|
|  11867|4146.4146509127695|
|  14533|3996.4664408855006|
| 645042|3531.6270898037437|
|  17867|3246.0983906041415|
|5042916| 2991.945739166179|
|4689264| 2982.324883041748|
|  14532|2934.7468292031704|
|  25391| 2903.546223513398|
|   5405| 2891.416329154637|
|4764461|2834.3669873326608|
|  15573|2783.8651181588366|
|   9316|2782.0396464137693|
|8569916|2775.2861918400167|
+-------+------------------+
only showing top 20 rows



                                                                                

In [26]:
# test that PageRank computaion took less than 1 hour
assert pr_time < 2*60*60

# Reporting

**YOUR TASK (5 points):** execute and complete the following lines to complete 
the reporting requirements for assignment #3. 

In [27]:
# size of input data
!gsutil du -sh "gs://wikidata_preprocessed/"

14.28 GiB    gs://wikidata_preprocessed


In [28]:
# size of index data
index_dst = f'gs://{bucket_name}/postings_gcp/'
!gsutil du -sh "$index_dst"

5.92 GiB     gs://ass3_new/postings_gcp


In [29]:
# How many USD credits did you use in GCP during the course of this assignment?
cost = 25.8
print(f'I used {cost} USD credit during the course of this assignment')

I used 25.8 USD credit during the course of this assignment


**Bonus (10 points)** if you implement PageRank in pure PySpark, i.e. without using the GraphFrames package, AND manage to complete 10 iterations of your algorithm on the entire English Wikipedia in less than an hour. 


In [20]:
#If you have decided to do the bonus task - please copy the code here 

bonus_flag = False # Turn flag on (True) if you have implemented this part

t_start = time()

# PLACE YOUR CODE HERE

pr_time_Bonus = time() - t_start


In [21]:
# Note:test that PageRank computaion took less than 1 hour
assert pr_time_Bonus < 60*60 and bonus_flag