In [None]:
!gcloud dataproc clusters list --region us-central1

# Imports & Setup

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
from collections import Counter
from nltk.corpus.reader.rte import norm

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

In [None]:
!ls -l /usr/lib/spark/jars/graph*

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

# Load the data

In [None]:
full_path = "gs://wikidata_preprocessed/*"
parquetFile = spark.read.parquet(full_path)

doc_body_pairs = parquetFile.select("text", "id").rdd
doc_title_pairs = parquetFile.select("title", "id").rdd
doc_anchor_pairs = parquetFile.select("anchor_text", "id").rdd

In [None]:
%cd -q /home/dataproc
!ls inverted_index_gcp.py

In [None]:
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex
from inverted_index_gcp import MultiFileReader

# Preproc

In [None]:
english_stopwords = frozenset(stopwords.words('english'))


def doc_count(id,text):
 counter = Counter()
 tokens = tokeniz_clean(text)
 result = []
 for token in tokens:
  counter[token] += 1
 for count in counter : 
  result.append((id,(count,counter[count])))
 return result



def tokeniz_clean(text):
  list_token = []
  RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
  stemmer = PorterStemmer()
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  for token in tokens:
    if token not in english_stopwords: 
      list_token.append(stemmer.stem(token))
  return list_token

def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in 
  `all_stopwords` and return entries that will go into our posting lists. 
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs 
      for example: [("Anarchism", (12, 5)), ...]
  '''

  stemmer = PorterStemmer()
  counter = Counter()
  tokens = tokeniz_clean(text)
  result = []
  for token in tokens:
    if token not in english_stopwords: 
      counter[stemmer.stem(token)] += 1
  for count in counter : 
    result.append((count,(id,counter[count])))
  return result

def len_doc(text):
  count = 0
  tokens =  tokeniz_clean(text)
  for token in tokens:
      count+=1
  return count
  
def word_count2(text, id):

  counter = Counter()
  tokens = tokeniz_clean(text)
  result = []
  for i,token in enumerate(tokens):
    if len(tokens)-1 == i: break
    counter[tokens[i] +" "+ tokens[i+1]] += 1

  for count in counter : 
    result.append((count,(id,counter[count])))
  return result

import math 

def reduce_word_counts(unsorted_pl):

  return sorted(unsorted_pl,key= lambda x:x[1],reverse = True)

def calculate_norm(norm1):
  count=0
  doc_norm = {}
  for doc,frek in norm1.items():
    count = 0
    for frek_t in frek:
      count += frek_t[1]**2
    doc_norm[doc] = 1/math.sqrt(count)
  return doc_norm

def calculate_df(postings):
  return postings.map(lambda X1: (X1[0],len(X1[1])))



In [None]:
def token2bucket_id(token,num_buckets):
  return int(_hash(token),16) % num_buckets

def write_bucket(posting,bucket_name,num_buckets):
  ''' A function that partitions the posting lists into buckets, writes out 
  all posting lists in a bucket to disk, and merge the posting locations into a single dict,
  Finally create the index.

  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list, bucket_map) pair.
  Returns:
  --------
    RDD
      inverted index.
  '''
       
  tempRDD = posting.map(lambda x:(token2bucket_id(x[0],num_buckets),[(x[0],x[1])]))
  ldRDD = tempRDD.reduceByKey(lambda a,b: a+b)
  index = InvertedIndex()
  posting_list =ldRDD.map(lambda x: index.write_a_posting_list(x,bucket_name))
  return posting_list

def create_df_posting(doc_pairs,name,k,num_buckets):
  ''' A function that count the word in each doc, group them together and create the df.
       
      
  Parameters:
  -----------
    doc_pairs - RDD of doc_id,text of each doc,
    name - the suffix of the bucket name,
    k - Filter word that have less from k 
  Returns:
  --------
    w2df - dict of df
    posting_filter - posting_list
  '''
    if name in ["body2","title2"] :
        word_counts = doc_pairs.flatMap(lambda x: word_count2(x[0], x[1]))
    else:
        word_counts = doc_pairs.flatMap(lambda x: word_count(x[0], x[1]))
    postings = word_counts.groupByKey().mapValues(reduce_word_counts)
    postings_filtered = postings.filter(lambda x: len(x[1])>50).map(lambda x:(x[0], x[1][:150]))
    w2df = calculate_df(postings_filtered).collectAsMap()
    _ = write_bucket(postings_filtered,f"206065989_{name}",num_buckets).collect()
    return w2df,postings_filtered

In [None]:
client = storage.Client()

def super_post(bucket_name,df):

    super_posting_locs = defaultdict(list)
    blobs = client.list_blobs(bucket_name)
    for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
      if not blob.name.endswith("pickle"):
        continue
      with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        posting_rdd = sc.parallelize(posting_locs.items())
        df_rdd = sc.parallelize(df.items())
        posting_locs = df_rdd.join(posting_rdd).sortBy(lambda x :x[1][0], False).map(lambda x:(x[0],x[1][1])).collectAsMap()
        for k, v in posting_locs.items():
          super_posting_locs[k].extend(v)
    return super_posting_locs


In [None]:
#Manipulation on anchor text
doc_anchor_pairs = doc_anchor_pairs.flatMap(lambda x:(x[0]))
doc_anchor_pairs_filter = doc_anchor_pairs.filter(lambda x:len(x[1])>1)
doc_anchor_pairs_filter = doc_anchor_pairs_filter.filter(lambda x:len((x[1].split(" "))[0])>1)
doc_anchor_pairs_filter = doc_anchor_pairs_filter.map(lambda x:(x[1],x[0]))

# Create index 

In [None]:
def crete_index(name):
    # Create inverted index instance
    inverted = InvertedIndex()
    # Adding the posting locations dictionary to the inverted index
    if name == "body":
        inverted.doc_len = dict_body_len.collectAsMap()
        inverted.num_doc = len(inverted.doc_len)
        inverted.df,posting_filter = create_df_posting(doc_body_pairs,"body",50,124)
        tf_idf_filtered = posting_filter.map(lambda x: (x[0],[(y[0],(y[1]/inverted.doc_len[y[0]]) * np.log2(inverted.num_doc/inverted.df[x[0]])) for y in x[1]]))
        inverted.norm = tf_idf_filtered.flatMap(lambda x: [(y[0],y[1]**2)  for y in x[1]]).reduceByKey(lambda x, y: x+y).collectAsMap()
  
    elif name == 'title':
        inverted.df,posting_filter = create_df_posting(doc_title_pairs,"title",0,124)
        inverted.title = doc_title_pairs.toDF().select("id","title").rdd.collectAsMap()
        
    elif name == 'anchor':
        inverted.df,posting_filter = create_df_posting(doc_anchor_pairs,"anchor",10,124)
        
    else:
        inverted.df,posting_filter = create_df_posting(doc_title_pairs,"title2",0,124)
        
    inverted.posting_locs = super_post(f"206065989_{name}",inverted.df)


    # write the global stats out
    inverted.write_index('.', f'index_{name}')
    # upload to gs
    index_src = f"index_{name}.pkl"
    index_dst = f'gs://206065989_{name}/postings_gcp/{index_src}'
    !gsutil cp $index_src $index_dst

In [None]:
crete_index("body")
crete_index("title")
crete_index("title2")
crete_index("anchor")
crete_index("title_pure")

# PageRank

In [None]:
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and 
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the 
      second entry is the destination page id. No duplicates should be present. 
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph 
      created by the wikipedia links. No duplicates should be present. 
  '''
  edges = pages.flatMap(lambda x:map(lambda y:(x['id'],y[0]),x['anchor_text'])).distinct()
  vertices_s = pages.map(lambda x:x[0])
  anchor_text = pages.flatMap(lambda x:x[1])
  vertices_t = anchor_text.map(lambda x:x[0])
  vertices = vertices_s.union(vertices_t).distinct().map(lambda x:(x,))
  return edges, vertices

In [None]:
bucket_name = '' 
t_start = time()
pages_links = spark.read.parquet("gs://wikidata_preprocessed/*").select("id", "anchor_text").rdd
# construct the graph 
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = (pr.toPandas()).to_dict()
pr = pd.DataFrame(pr)
q = dict(zip(pr.id, pr.pagerank))

# Page views

In [None]:
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
 
!wget -N $pv_path
 
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
 
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})
# write out the counter as binary file (pickle it)
with open(pv_clean, 'wb') as f:
  pickle.dump(wid2pv, f)

# Reporting

In [None]:
# size of index data
index_dst = f'gs://{bucket_name}/postings_gcp/'
!gsutil du -sh "$index_dst"