# Imports & Setup

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

In [None]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = '206921116_ass3' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name.endswith('.parquet'):
        paths.append(full_path+b.name)

read data

In [None]:
parquetFile = spark.read.parquet(*paths)
#text:
doc_text_pairs = parquetFile.select("text", "id").rdd

#title:
doc_title_pairs = parquetFile.select("title", "id").rdd

#anchor:
doc_anchor_pairs = parquetFile.select("anchor_text", "id").rdd
doc_anchor_pairs1 = doc_anchor_pairs.flatMap(lambda x: [(i['text'],i['id']) for i in x['anchor_text']])

tokenize functions

In [None]:
#tokenize WITH STEMMER----------------------------------------------------------------------------------------------
from nltk.stem import PorterStemmer
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ['category', 'references', 'also', 'links', 'extenal', 'see', 'thumb']
RE_WORD = re.compile(r"""[\#\@\w](['\-]?[\w,]?[\w.]?(?:['\-]?[\w,]?[\w])){0,24}""", re.UNICODE)
RE_WORD_OLD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
stopwords_frozen = frozenset(stopwords.words('english'))
all_stopwords = english_stopwords.union(corpus_stopwords)

def tokenize(text):
    """
    This function aims in tokenize a text into a list of tokens. Moreover, it filter stopwords.

    Parameters:
    -----------
    text: string , represting the text to tokenize.

    Returns:
    -----------
    list of tokens (e.g., list of tokens).
    """
    stemmer = PorterStemmer()
    res = []
    list_of_tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if
                      token.group() not in all_stopwords]
    for token in list_of_tokens:
        res.append(stemmer.stem(token))
    return res

In [None]:
def tokenize_no_stem(text):
    """
    This function aims in tokenize a text into a list of tokens. Moreover, it filter stopwords.

    Parameters:
    -----------
    text: string , represting the text to tokenize.

    Returns:
    -----------
    list of tokens (e.g., list of tokens).
    """
    res = []
    list_of_tokens = [token.group() for token in RE_WORD_OLD.finditer(text.lower()) if
                      token.group() not in all_stopwords]
    return list_of_tokens

**general**

NF

In [None]:
!gsutil -m cp -r gs://206921116_ass3/general/DL_dict_body.pickle .

!gsutil -m cp -r gs://206921116_ass3/body_index .

In [None]:
with open(Path('.') / f'DL_dict_body.pickle', 'rb') as f:
    DL = pickle.load(f)
body_index = InvertedIndex.read_index("body_index", "body_index")

In [None]:
# NF for COSINE SIMILARITY WITH TFIDF on NON STEM body
from collections import Counter
import math
def all_words_in_doc(text):
  tokens = tokenize_no_stem(text)
  return Counter(tokens)

def get_nf(id, count):
  length = len(DL)
  total = 0
  for token in count:
    if token not in body_index.df:
        continue
    tf = count[token]/length
    df = body_index.df[token]
    idf = math.log(length/df,10)
    tfidf = tf*idf
    total += tfidf**2
  return math.sqrt(total)

res = doc_text_pairs.map(lambda x: (x[1],all_words_in_doc(x[0])))
final_r = res.map(lambda x: (x[0], get_nf(x[0],x[1])))

In [None]:
#collect final_r for pickle
a=final_r.collectAsMap()

In [None]:
#write to bucket
pickle.dump(final_r, open("doc_nf.pickle", "wb"))
!gsutil cp doc_nf.pickle gs://206921116_ass3/general/doc_nf.pickle

DL

In [None]:
## code for all DLs
def len_of_doc(t):
    x= tokenize(t)
    return len(x)

# DL for body
a_body = doc_text_pairs.map(lambda x: (x[1], len_of_doc(x[0])))
b_body = a.collectAsMap()

#DL for title
a = doc_title_pairs.map(lambda x: (x[1], len_of_doc(x[0])))
b = a.collectAsMap()


In [None]:
#download all
#body
pickle.dump(b_body, open("DL_dict_body.pickle", "wb"))
!gsutil cp DL_dict_body.pickle gs://206921116_ass3/general/DL_dict_body.pickle
#title
pickle.dump(b, open("DL_dict_title.pickle", "wb"))
!gsutil cp DL_dict_title.pickle gs://206921116_ass3/general/DL_dict_title.pickle

doc_title_pairs_bykey -> get a title from an ID

In [None]:
# create dict for id -> doc title - doesn't matter if STEM
doc_title_pairs_bykey = doc_title_pairs1.sortByKey()
b = doc_title_pairs_bykey.collectAsMap()

#pickle and save in bucket
pickle.dump(b, open("doc_title_pairs_bykey.pickle", "wb"))
!gsutil cp doc_title_pairs_bykey.pickle gs://206921116_ass3/general/doc_title_pairs_bykey.pickle

import inverted index class

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex

functions needed for all indexes

In [None]:
#for all---------------------------------------
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def word_count(text, id):
  tokens = tokenize(text)
  count_dict = {}
  for token in tokens:
    if token in all_stopwords:
      continue
    if token in count_dict:
      count_dict[token] = count_dict[token]+1
    else:
      count_dict[token] = 1
  res = []
  added = []
  for token in tokens:
    if token in count_dict and token not in added:
      res.append((token, (id, count_dict[token])))
      added.append(token)
  return res

def word_count_no_stem(text, id):
  tokens = tokenize_no_stem(text)
  count_dict = {}
  for token in tokens:
    if token in all_stopwords:
      continue
    if token in count_dict:
      count_dict[token] = count_dict[token]+1
    else:
      count_dict[token] = 1
  res = []
  added = []
  for token in tokens:
    if token in count_dict and token not in added:
      res.append((token, (id, count_dict[token])))
      added.append(token)
  return res

def reduce_word_counts(unsorted_pl):
  return sorted(unsorted_pl)

def calculate_df(postings):
  new_posting = postings.map(lambda x:(x[0],len(x[1])))
  return new_posting

def partition_postings_and_write(postings, index_name):
  new_rdd = postings.groupBy(lambda x: token2bucket_id(x[0]))
  final_rdd = new_rdd.map(lambda x: InvertedIndex.write_a_posting_list(x, index_name,bucket_name))
  return final_rdd

**body index**

STEM

In [None]:
#text-----------------------------------------------------------------------
# time the index creation time
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered, "stem_body_index").collect()

In [None]:
#text-----------------------------------------
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='stem_body_index'):
  if not blob.name.endswith("pickle"):
    continue
  if "body" not in blob.name:
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
#text--------------------------------------------------------------------------
# Create inverted index instance
inverted_text = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_text.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted_text.df = w2df_dict
# write the global stats out
path = f'gs://{bucket_name}/stem_body_index'
inverted_text.write_index('.', 'stem_body_index')
# upload to gs
index_src = "stem_body_index.pkl"
index_dst = f'gs://{bucket_name}/stem_body_index/{index_src}'
!gsutil cp $index_src $index_dst

NON STEM

In [None]:
#text NON STEM-----------------------------------------------------------------------
# time the index creation time
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count_no_stem(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered, "body_index").collect()

In [None]:
#text-----------------------------------------
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='body_index'):
  if not blob.name.endswith("pickle"):
    continue
  if "body" not in blob.name:
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
#text--------------------------------------------------------------------------
# Create inverted index instance
inverted_text = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_text.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted_text.df = w2df_dict
# write the global stats out
path = f'gs://{bucket_name}/body_index'
inverted_text.write_index('.', 'body_index')
# upload to gs
index_src = "body_index.pkl"
index_dst = f'gs://{bucket_name}/body_index/{index_src}'
!gsutil cp $index_src $index_dst

TITLE

STEM

In [None]:
#title-----------------------------------------------------------------------
# time the index creation time
# word counts map
word_counts_title = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings_title = word_counts_title.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
w2df_title = calculate_df(postings_title)
w2df_dict_title = w2df_title.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_title, "stem_title_index").collect()

In [None]:
#title-----------------------------------------
# collect all posting lists locations into one super-set
super_posting_locs_title = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='stem_title_index'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_title[k].extend(v)

In [None]:
#title---------------------------------------------------------------------------
# Create inverted index instance
inverted_title = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_title.posting_locs = super_posting_locs_title
# Add the token - df dictionary to the inverted index
inverted_title.df = w2df_dict_title
# write the global stats out
inverted_title.write_index('.', 'stem_title_index')
# upload to gs
index_src = "stem_title_index.pkl"
index_dst = f'gs://{bucket_name}/stem_title_index/{index_src}'
!gsutil cp $index_src $index_dst

NO STEM

In [None]:
#title-----------------------------------------------------------------------
# time the index creation time
# word counts map
word_counts_title = doc_title_pairs.flatMap(lambda x: word_count_no_stem(x[0], x[1]))
postings_title = word_counts_title.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
w2df_title = calculate_df(postings_title)
w2df_dict_title = w2df_title.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_title, "title_index").collect()

In [None]:
#title-----------------------------------------
# collect all posting lists locations into one super-set
super_posting_locs_title = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='title_index'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_title[k].extend(v)

In [None]:
#title---------------------------------------------------------------------------
# Create inverted index instance
inverted_title = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_title.posting_locs = super_posting_locs_title
# Add the token - df dictionary to the inverted index
inverted_title.df = w2df_dict_title
# write the global stats out
inverted_title.write_index('.', 'title_index')
# upload to gs
index_src = "title_index.pkl"
index_dst = f'gs://{bucket_name}/title_index/{index_src}'
!gsutil cp $index_src $index_dst

ANCHOR

NO STEM

In [None]:
#anchor NOT STEM------------
x=doc_anchor_pairs1.flatMap(lambda x: [(i, x[1]) for i in tokenize_no_stem(x[0])])
y=x.groupByKey().mapValues(list)
z=y.map(lambda x: (x[0],list(Counter(x[1]).items())))

In [None]:
#anchor NO STEM----------------------------------------------------------------------
w2df_anch=calculate_df(z).collectAsMap()
posting_locs_list_anch = partition_postings_and_write(z, "anchor_index").collect()

# collect all posting lists locations into one super-set
super_posting_locs_anchor = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='anchor_index'):
  if not blob.name.endswith("pickle"):
    continue
  if "anchor" not in blob.name:
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_anchor[k].extend(v)

In [None]:
#anchor NO STEM--------------------------------------------------------------------------
# Create inverted index instance
inverted_anch = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_anch.posting_locs = super_posting_locs_anchor
# Add the token - df dictionary to the inverted index
inverted_anch.df = w2df_anch
# write the global stats out
inverted_anch.write_index('.', 'anchor_index')
# upload to gs
index_src = "anchor_index.pkl"
index_dst = f'gs://{bucket_name}/anchor_index/{index_src}'
!gsutil cp $index_src $index_dst

example of how to download locally from bucket

In [None]:
!gsutil cp -r gs://206921116_ass3/anchor_index . # download all anchor folder

example of how to open a pickle

In [None]:
with open(Path("general") / f'page_rank_dict.pickle', 'rb') as f:
    pr = pickle.load(f)

read_posting_list  function

In [None]:
from inverted_index_gcp import MultiFileReader
from inverted_index_gcp import MultiFileWriter

TUPLE_SIZE = 6       
TF_MASK = 2 ** 16 - 1 # Masking the 16 low bits of an integer
from contextlib import closing

def read_posting_list(inverted, w, index_name):
  with closing(MultiFileReader()) as reader:
    locs = inverted.posting_locs[w]
    b = reader.read(locs, inverted.df[w] * TUPLE_SIZE, index_name)
    posting_list = []
    for i in range(inverted.df[w]):
      doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
      tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
      posting_list.append((doc_id, tf))
    return posting_list


#pl = read_posting_list(inverted_anch, 'horse', "anchor_index")
#print(pl)

page rank

In [None]:
#graph for page rank--------------------------------------------------------------------------------

parquetFile = spark.read.parquet(*paths)
pages_links = parquetFile.select("id", "anchor_text").rdd

edges= pages_links.flatMap(lambda x: [(x[0], w[0]) for w in x[1]]).distinct()
vertices= edges.flatMap(lambda x: [(x[0],),(x[1],)]).distinct()
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
#pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr.show()

In [None]:
#convert df to rdd and create dict
pr_rdd = pr.rdd
pr_dict = pr_rdd.collectAsMap()

In [None]:
# write page rank dict to bucket
pickle.dump(pr_dict, open("page_rank_dict.pickle", "wb"))
!gsutil cp page_rank_dict.pickle gs://206921116_ass3/general/page_rank_dict.pickle

page view

In [None]:
#for page_view--------------------------------------------------------------------------------
# Paths
# Using user page views (as opposed to spiders and automated traffic) for the 
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB) 
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly 
# total number of page views (5). Then, remove lines with article id or page 
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same 
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})

In [None]:
pickle.dump(wid2pv, open("page_view_stats.pickle", "wb"))
!gsutil cp page_view_stats.pickle gs://206921116_ass3/general/page_view_stats.pickle