In [None]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

# Imports & Setup

In [None]:
!pip install -q google-cloud-storage==2.10.0
!pip install -q graphframes

In [None]:
!pip show google-cloud-storage

In [None]:
def check_gcs_version():
    import google.cloud.storage
    return google.cloud.storage.__version__

# Run on all workers
versions_rdd = sc.parallelize(range(sc.defaultParallelism), sc.defaultParallelism)
worker_versions = versions_rdd.map(lambda x: check_gcs_version()).collect()
print(f"Worker versions: {set(worker_versions)}")

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

In [None]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
bucket_name = "eyalir1"
client = storage.Client()  # Initialize here when actually needed
print("Collecting posting locations...")

# Building an inverted index

In [None]:
parquetFile = spark.read.parquet("gs://eyalir1/multistream*.parquet")
print(f"Successfully loaded parquet files")
print(f"Schema:")
parquetFile.printSchema()

# Count documents
count = parquetFile.count()
print(f"Total documents: {count:,}")

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "part","including", "following", 
                    "however", "would", "became"]

words_to_keep = ["first", "history", "one", "two", "people", "may", "see", "second", "many"]
all_stopwords = english_stopwords.union(corpus_stopwords) - set(words_to_keep)
RE_WORD = re.compile(r"""[\#\@\w](['\'\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

print(f"Total stopwords: {len(all_stopwords)}")
print(f"Removed problematic stopwords: first, history, one, two, people, may, see, second, many")

def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  tok_counts = Counter(tokens) #using counter to count all the tokens at once
  counts = [] #the tuple list to return
  for token, count in tok_counts.items(): #going token by token with its count
    if token not in all_stopwords: # incase the token is not a stopword, add id with its count
      counts.append((token, (id, count)))
  return counts #return the counts

def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  return sorted(unsorted_pl) # we can simply use the sorted function since its already sorting by the first element when receiving a tuple


def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  postings.persist()
  postings_df = postings
  return postings_df.map(lambda x: (x[0], len(x[1]))) # we take the token, and then the length of the posting list is the df, and use map to create the RDD


def partition_postings_and_write(postings, folder_name):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning should be done through the use of `token2bucket`
  above. Writing to disk should use the function  `write_a_posting_list`, a
  static method implemented in inverted_index_colab.py under the InvertedIndex
  class.
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
  '''
  return postings.map(lambda x: (token2bucket_id(x[0]), x)).groupByKey().map(lambda x: InvertedIndex.write_a_posting_list(x, folder_name, bucket_name)) #mapping the dictionary using the map with token2bucket in the id
  # and the posting list as the value. Then, I  group by the bucket id, and then i map using the static inverted index class writa a posting list with the postings list as the value

print("All functions defined")


INDEX 1: BUILDING THE BODY TEXT INDEX

In [None]:
print("BUILDING BODY TEXT INDEX")

t_start_body = time()

# Extract (text, id) pairs
doc_text_pairs = parquetFile.select("text", "id").rdd
print(f"Processing {doc_text_pairs.count():,} documents")

# Tokenize and count
print("Tokenizing body text...")
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)

# Filter: only keep terms that appear in >50 documents
print("Filtering rare terms (df <= 50)...")
postings_filtered = postings.filter(lambda x: len(x[1]) > 50)

# Calculate DF
w2df_body = calculate_df(postings_filtered)
w2df_body_dict = w2df_body.collectAsMap()
print(f"Kept {len(w2df_body_dict):,} terms (df > 50)")
bucket_name = "eyalir1"
# Write posting lists to gs://eyalir1/postings_gcp_body/
print(f"Writing posting lists to gs://{bucket_name}/postings_gcp_body/...")
_ = partition_postings_and_write(postings_filtered, 'postings_gcp_body').collect()

body_index_time = time() - t_start_body
print(f"Body index created in {body_index_time/60:.1f} minutes")

In [None]:
client = storage.Client()  # Initialize here when actually needed
print("Collecting posting locations...")

In [None]:
import subprocess
import os

print("Collecting posting locations...")
super_posting_locs_body = defaultdict(list)

# List all pickle files
result = subprocess.run(['gsutil', 'ls', f'gs://{bucket_name}/postings_gcp_body/*.pickle'], 
                       capture_output=True, text=True)
pickle_files = result.stdout.strip().split('\n')

# Download and process each pickle file
for gs_path in pickle_files:
    if gs_path:
        local_file = 'temp_posting.pickle'
        subprocess.run(['gsutil', 'cp', gs_path, local_file])
        with open(local_file, 'rb') as f:
            posting_locs = pickle.load(f)
            for k, v in posting_locs.items():
                super_posting_locs_body[k].extend(v)
        os.remove(local_file)

print(f"Collected {len(super_posting_locs_body):,} posting locations")

In [None]:
# Create and save body index object
inverted_body = InvertedIndex()
inverted_body.posting_locs = super_posting_locs_body
inverted_body.df = w2df_body_dict
inverted_body.write_index('.', 'index_body')

# Upload to bucket
!gsutil cp index_body.pkl gs://{bucket_name}/postings_gcp_body/
print(f"Body index saved to gs://{bucket_name}/postings_gcp_body/index_body.pkl")
print("="*80)
print("BODY INDEX COMPLETE")
print("="*80)

INDEX 2: TITLE INDEX

In [None]:
print("="*80)
print("BUILDING TITLE INDEX")
print("="*80)

t_start_title = time()

# Extract (title, id) pairs
doc_title_pairs = parquetFile.select("title", "id").rdd
print(f"Processing {doc_title_pairs.count():,} titles")

# Tokenize titles
print("Tokenizing titles...")
word_counts_title = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings_title = word_counts_title.groupByKey().mapValues(reduce_word_counts)

# NO FILTERING - keep all terms
print("No filtering for titles - keeping all terms")

# Calculate DF
w2df_title = calculate_df(postings_title)
w2df_title_dict = w2df_title.collectAsMap()
print(f"Kept {len(w2df_title_dict):,} title terms")

# Write posting lists to gs://eyalir1/postings_gcp_title/
print(f"Writing posting lists to gs://{bucket_name}/postings_gcp_title/...")
_ = partition_postings_and_write(postings_title, 'postings_gcp_title').collect()

title_index_time = time() - t_start_title
print(f"Title index created in {title_index_time/60:.1f} minutes")

In [None]:
# Collect posting locations
print("Collecting posting locations...")
super_posting_locs_title = defaultdict(list)

result = subprocess.run(['gsutil', 'ls', f'gs://{bucket_name}/postings_gcp_title/*.pickle'], 
                       capture_output=True, text=True)
pickle_files = result.stdout.strip().split('\n')

for gs_path in pickle_files:
    if gs_path:
        local_file = 'temp_posting.pickle'
        subprocess.run(['gsutil', 'cp', gs_path, local_file])
        with open(local_file, 'rb') as f:
            posting_locs = pickle.load(f)
            for k, v in posting_locs.items():
                super_posting_locs_title[k].extend(v)
        os.remove(local_file)

print(f"Collected {len(super_posting_locs_title):,} posting locations")

In [None]:
# Create and save title index object
inverted_title = InvertedIndex()
inverted_title.posting_locs = super_posting_locs_title
inverted_title.df = w2df_title_dict
inverted_title.write_index('.', 'index_title')

# Upload to bucket
!gsutil cp index_title.pkl gs://{bucket_name}/postings_gcp_title/
print(f"✅ Title index saved to gs://{bucket_name}/postings_gcp_title/index_title.pkl")
print("="*80)
print("✅ TITLE INDEX COMPLETE")
print("="*80)

INDEX 3: ANCHOR TEXT INDEX

In [None]:
print("="*80)
print("BUILDING ANCHOR TEXT INDEX")
print("="*80)

t_start_anchor = time()

# Define function to process anchor_text field
def process_anchor_text(row):
  """Extract (target_id, anchor_token, count) from anchor_text array.
  
  Each element in anchor_text is a struct with:
    - id: the TARGET page id that the link points to
    - text: the anchor text of the link
  
  Returns: List of (token, (target_id, tf)) tuples
  """
  results = []
  if row.anchor_text is None:
    return results
  
  for link in row.anchor_text:
    if link.text is None:
      continue
    target_id = link.id
    
    # Tokenize the anchor text
    tokens = [token.group() for token in RE_WORD.finditer(link.text.lower())]
    tok_counts = Counter(tokens)
    
    for token, count in tok_counts.items():
      if token not in all_stopwords:
        results.append((token, (target_id, count)))
  
  return results

# Process all anchor text
print("Processing anchor text...")
anchor_pairs = parquetFile.select("anchor_text").rdd.flatMap(process_anchor_text)
postings_anchor = anchor_pairs.groupByKey().mapValues(reduce_word_counts)
print("Anchor text extracted")

# Calculate DF
w2df_anchor = calculate_df(postings_anchor)
w2df_anchor_dict = w2df_anchor.collectAsMap()
print(f"Kept {len(w2df_anchor_dict):,} anchor terms")

# Write posting lists to gs://eyalir1/postings_gcp_anchor/
print(f"Writing posting lists to gs://{bucket_name}/postings_gcp_anchor/...")
_ = partition_postings_and_write(postings_anchor, 'postings_gcp_anchor').collect()

anchor_index_time = time() - t_start_anchor
print(f"Anchor index created in {anchor_index_time/60:.1f} minutes")

In [None]:
# Collect posting locations
print("Collecting posting locations...")
super_posting_locs_anchor = defaultdict(list)

result = subprocess.run(['gsutil', 'ls', f'gs://{bucket_name}/postings_gcp_anchor/*.pickle'], 
                       capture_output=True, text=True)
pickle_files = result.stdout.strip().split('\n')

for gs_path in pickle_files:
    if gs_path:
        local_file = 'temp_posting.pickle'
        subprocess.run(['gsutil', 'cp', gs_path, local_file])
        with open(local_file, 'rb') as f:
            posting_locs = pickle.load(f)
            for k, v in posting_locs.items():
                super_posting_locs_anchor[k].extend(v)
        os.remove(local_file)

print(f"Collected {len(super_posting_locs_anchor):,} posting locations")

In [None]:
# Create and save anchor index object
inverted_anchor = InvertedIndex()
inverted_anchor.posting_locs = super_posting_locs_anchor
inverted_anchor.df = w2df_anchor_dict
inverted_anchor.write_index('.', 'index_anchor')

# Upload to bucket
!gsutil cp index_anchor.pkl gs://{bucket_name}/postings_gcp_anchor/
print(f"Anchor index saved to gs://{bucket_name}/postings_gcp_anchor/index_anchor.pkl")
print("="*80)
print("ANCHOR INDEX COMPLETE")
print("="*80)

In [None]:
print("="*80)
print("CREATING DOC ID → TITLE MAPPING")
print("="*80)

# Write as parquet first (stays distributed)
print("Writing mapping as parquet...")
parquetFile.select("id", "title").repartition(1).write.mode('overwrite').parquet(f'gs://{bucket_name}/doc_title_mapping.parquet')
print(f"Saved to gs://{bucket_name}/doc_title_mapping.parquet")

# Now collect to pickle (safer after parquet is written)
print("Collecting to pickle format...")
doc_title_mapping = spark.read.parquet(f'gs://{bucket_name}/doc_title_mapping.parquet').rdd.collectAsMap()
print(f"Collected {len(doc_title_mapping):,} mappings")
# Save to pickle for convenience
with open('doc_title_mapping.pkl', 'wb') as f:
    pickle.dump(dict(doc_title_mapping), f)
!gsutil cp doc_title_mapping.pkl gs://{bucket_name}/
print(f"Saved pickle to gs://{bucket_name}/doc_title_mapping.pkl")
print("="*80)
print("DOC ID → TITLE MAPPING COMPLETE")
print("="*80)

# PageRank

In [None]:
def generate_graph(pages):
  '''Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
  '''
  edges = pages.flatMap(
      lambda page: [(page.id, link.id) for link in page.anchor_text]
  ).distinct()

  edges.persist()

  sources = pages.map(lambda x: x.id)
  destinations = edges.map(lambda x: x[1])

  vertices = sources.union(destinations).distinct().map(lambda x: (x,))

  return edges, vertices

In [None]:
print("="*80)
print("COMPUTING PAGERANK")
print("="*80)

t_start_pr = time()

# Load data
pages_links = spark.read.parquet(f"gs://{bucket_name}/multistream*.parquet").select("id", "anchor_text").rdd
print("Loaded anchor_text data")

# Build graph
print("Building graph...")
edges, vertices = generate_graph(pages_links)
print(f"Graph: {vertices.count():,} vertices, {edges.count():,} edges")

# Compute PageRank
print("Computing PageRank (6 iterations)...")
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=4)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())

# Save
print("Saving PageRank...")
pr.repartition(1).write.mode('overwrite').csv(f'gs://{bucket_name}/pr', compression="gzip")

pr_time = time() - t_start_pr
print(f"PageRank computed in {pr_time/60:.1f} minutes")
print("\nTop 20 pages by PageRank:")
pr.show(20)

print("="*80)
print("PAGERANK COMPLETE")
print("="*80)

# Reporting

In [None]:
import pickle
from google.cloud import storage

bucket_name = 'eyalir1'

# Check files exist
print("Checking files in GCS...")
!gsutil ls gs://{bucket_name}/postings_gcp_body/index_body.pkl
!gsutil ls gs://{bucket_name}/postings_gcp_title/index_title.pkl
!gsutil ls gs://{bucket_name}/postings_gcp_anchor/index_anchor.pkl
!gsutil ls gs://{bucket_name}/doc_title_mapping.pkl
!gsutil ls gs://{bucket_name}/pr/*.csv.gz

print("\n" + "="*80)
print("Loading and validating indices...")
print("="*80)

# Download and load each index
!gsutil cp gs://{bucket_name}/postings_gcp_body/index_body.pkl .
!gsutil cp gs://{bucket_name}/postings_gcp_title/index_title.pkl .
!gsutil cp gs://{bucket_name}/postings_gcp_anchor/index_anchor.pkl .
!gsutil cp gs://{bucket_name}/doc_title_mapping.pkl .

from inverted_index_gcp import InvertedIndex

# Load and check body index
idx_body = InvertedIndex.read_index('.', 'index_body')
print(f"\n1. Body Index:")
print(f"   - Terms: {len(idx_body.df):,}")
print(f"   - Posting locations: {len(idx_body.posting_locs):,}")

# Load and check title index
idx_title = InvertedIndex.read_index('.', 'index_title')
print(f"\n2. Title Index:")
print(f"   - Terms: {len(idx_title.df):,}")
print(f"   - Posting locations: {len(idx_title.posting_locs):,}")

# Load and check anchor index
idx_anchor = InvertedIndex.read_index('.', 'index_anchor')
print(f"\n3. Anchor Index:")
print(f"   - Terms: {len(idx_anchor.df):,}")
print(f"   - Posting locations: {len(idx_anchor.posting_locs):,}")

# Load and check doc-title mapping
with open('doc_title_mapping.pkl', 'rb') as f:
    doc_title_mapping = pickle.load(f)
print(f"\n4. Doc-Title Mapping:")
print(f"   - Entries: {len(doc_title_mapping):,}")

# Check PageRank file
!gsutil du -sh gs://{bucket_name}/pr/

print("\n" + "="*80)
print("✅ ALL INDICES VALIDATED!")
print("="*80)

In [None]:
# Check sizes
print("\nIndex Sizes:")
!gsutil du -sh gs://{bucket_name}/postings_gcp_body/
!gsutil du -sh gs://{bucket_name}/postings_gcp_title/
!gsutil du -sh gs://{bucket_name}/postings_gcp_anchor/