In [1]:
# if the following command generates an error, you probably didn't enable
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-6137  GCE       2                                             RUNNING  us-central1-a


# Imports & Setup

In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import json

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar 10 11:35 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [6]:
spark

In [7]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = '209092196_212080188_209318666'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh':
        paths.append(full_path+b.name)

***GCP setup is complete!*** If you got here without any errors you've earned 10 out of the 35 points of this part.

# Building an inverted index

Here, we read the entire corpus to an rdd, directly from Google Storage Bucket and use your code from Colab to construct an inverted index.

In [8]:
parquetFile = spark.read.parquet(*paths).limit(1000)
doc_text_pairs = parquetFile.select("text", "id").rdd



We will count the number of pages to make sure we are looking at the entire corpus. The number of pages should be more than 6M

In [9]:
# Count number of wiki pages
parquetFile.count()

                                                                                

1000

In [10]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [11]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [12]:
from inverted_index_gcp import InvertedIndex

In [13]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS


def filter_tokens(tokens, tokens2remove=None, use_stemming=True):
    stemmer = PorterStemmer()
    if tokens2remove is not None:
        tokens = [tok for tok in tokens if tok not in tokens2remove]
    if use_stemming:
        tokens = [stemmer.stem(tok) for tok in tokens]
    return tokens

def word_count(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    tokens = filter_tokens(tokens,all_stopwords, True)
    freq_dict = {word: tokens.count(word) for word in tokens if word not in all_stopwords}
    ret = [(word, (id, freq_dict[word])) for word in freq_dict.keys()]
    return ret

def word_count_anchors(id, text):
    return word_count(tokenize_anchor(text), id)

def reduce_word_counts(unsorted_pl):
    return sorted(unsorted_pl)

def calculate_df(postings):
    return postings.mapValues(lambda posting_lst: len(posting_lst))

def calculate_tf(postings):
    return postings.flatMap(lambda x: [(x[0],len(x[1]))]).aggregateByKey(0, lambda x,y: x+y, lambda x,y: x+y)

def partition_postings_and_write(postings, base_dir):
    rdd = postings.groupBy(lambda x: token2bucket_id(x[0]))
    return(rdd.map(lambda x: InvertedIndex.write_a_posting_list(x, base_dir, bucket_name)))


# def partition_postings_and_write(postings):
#     return postings.map(lambda posting: (token2bucket_id(posting[0]), [(posting[0], posting[1])])).reduceByKey(lambda x, y: x + y).map(lambda posting: InvertedIndex.write_a_posting_list(posting,"postings_gcp/"
# ,bucket_name))

def get_docs_length(postings):
    return postings.flatMap(lambda x:[(curr_id,count) for curr_id,count in x[1]]).aggregateByKey(0, lambda x,y: x+y, lambda x,y: x+y)


def tokenize_anchor(anchors):
    tokens = [token[1] for token in anchors]
    return ' '.join(tokens)



In [16]:
# TITLE
docs_title_pairs = parquetFile.select("id","title")
docs_title_pairs_dict = dict(docs_title_pairs.collect())
with open("titles.json", "w") as _json:
    json.dump(docs_title_pairs_dict, _json)
title_src = "titles.json"
title_dst = f"gs://{bucket_name}/titles/{title_src}"

In [17]:
!gsutil cp $title_src $title_dst

Copying file://titles.json [Content-Type=application/json]...
/ [1 files][ 33.0 KiB/ 33.0 KiB]                                                
Operation completed over 1 objects/33.0 KiB.                                     


In [18]:
rdd=docs_title_pairs.rdd
word_counts_title = rdd.flatMap(lambda x: word_count(x[1], x[0]))
postings_titles = word_counts_title.groupByKey().mapValues(reduce_word_counts)
t2df_title = calculate_df(postings_titles).collectAsMap()
title_length = get_docs_length(postings_titles)
total_title = calculate_tf(postings_titles)

                                                                                

In [20]:
_ = partition_postings_and_write(postings_titles, "titles").collect()

                                                                                

In [20]:
# collect all posting lists locations into one super-set
titles_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='titles'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            titles_posting_locs[k].extend(v)

In [21]:
# Create inverted index instance
inverted = InvertedIndex()
inverted.posting_locs = titles_posting_locs
inverted.df = t2df_title
inverted.doc_length = title_length.collectAsMap()
inverted.term_total = total_title.collectAsMap()
inverted.write_index('.', 'title_index')
index_src = "title_index.pkl"
index_dst = f'gs://{bucket_name}/titles/{index_src}'

In [22]:
!gsutil cp $index_src $index_dst

Copying file://title_index.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 44.0 KiB/ 44.0 KiB]                                                
Operation completed over 1 objects/44.0 KiB.                                     


In [23]:
# BODY
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
postings_filtered = postings.filter(lambda x: len(x[1])>50)
total_body = calculate_tf(postings_filtered)
body_length = get_docs_length(postings_filtered)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()

                                                                                

In [24]:
_ = partition_postings_and_write(postings_filtered, "body").collect()

                                                                                

In [25]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='body'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)

Putting it all together

In [26]:
# Create inverted index instance
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
inverted.doc_length = body_length.collectAsMap()
inverted.term_total = total_body.collectAsMap()
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'index')
# upload to gs
index_src = "index.pkl"
index_dst = f'gs://{bucket_name}/body/{index_src}'
!gsutil cp $index_src $index_dst

Copying file://index.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 32.8 KiB/ 32.8 KiB]                                                
Operation completed over 1 objects/32.8 KiB.                                     


In [27]:
!gsutil ls -lh $index_dst

 32.79 KiB  2024-03-10T12:07:15Z  gs://209092196_212080188_209318666/body/index.pkl
TOTAL: 1 objects, 33572 bytes (32.79 KiB)


# PageRank

In [28]:
# Put your `generate_graph` function here
def generate_graph(pages):
    edges = pages.flatMap(lambda page: [(page[0], link[0]) for link in page[1]])
    edges = edges.distinct()
    vertices_src = pages.map(lambda page: (page[0],))
    vertices_anchr = pages.flatMap(lambda page: page[1]).map(lambda link: (link[0],))
    vertices = vertices_src.union(vertices_anchr).distinct()
    return edges, vertices

In [29]:
pages_links = parquetFile.select ("id","anchor_text").rdd
# construct the graph
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())




In [30]:
#check later
pr_dict = {int(row[0]): row[1] for _, row in pr.toPandas().iterrows()}
with open("pr.json", "w") as pr:
    json.dump(pr_dict, pr)

                                                                                

In [31]:
pr_src = "pr.json"
pr_dst = f'gs://{bucket_name}/pr/{pr_src}'
!gsutil cp $pr_src $pr_dst

Copying file://pr.json [Content-Type=application/json]...
/ [1 files][689.1 KiB/689.1 KiB]                                                
Operation completed over 1 objects/689.1 KiB.                                    


In [32]:
# ANCHOR
anchors_pairs = parquetFile.select ("id","anchor_text").rdd
word_count_anchor = anchors_pairs.flatMap(lambda x: word_count_anchors(x[0], x[1]))
anchor_postings = word_count_anchor.groupByKey().mapValues(reduce_word_counts)
#anchor_postings.first()
w2df = calculate_df(anchor_postings)
anchor_length = get_docs_length(anchor_postings)
w2df_dict = w2df.collectAsMap()
total_anchor = calculate_tf(anchor_postings)
# partition posting lists and write out
# calculate total terms

                                                                                

In [33]:
_ = partition_postings_and_write(anchor_postings, "anchor").collect()

                                                                                

In [34]:
# collect all posting lists locations into one super-set
anchor_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='anchor'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            anchor_posting_locs[k].extend(v)

In [35]:
# Create inverted index instance
inverted = InvertedIndex()
inverted.posting_locs = anchor_posting_locs
inverted.doc_length = anchor_length.collectAsMap()
inverted.df = w2df_dict
inverted.term_total = total_anchor.collectAsMap()
inverted.write_index('.', 'anchor_index')
index_src = "anchor_index.pkl"
index_dst = f'gs://{bucket_name}/anchor/{index_src}'

In [37]:
!gsutil cp $index_src $index_dst

997.88 KiB  2024-03-10T12:17:13Z  gs://209092196_212080188_209318666/anchor/anchor_index.pkl
TOTAL: 1 objects, 1021833 bytes (997.88 KiB)
