In [1]:
!gcloud dataproc clusters list --region us-central1
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-84ed  GCE       4                                             RUNNING  us-central1-c
[0m

In [2]:
import os
import re
import sys
import pickle
import hashlib
from time import time
from collections import Counter, defaultdict
from itertools import groupby
from operator import itemgetter
from pathlib import Path

import pandas as pd
import numpy as np
import nltk
from nltk.corpus import stopwords
from nltk.stem.porter import *
from operator import add

from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *
from nltk.stem.porter import *

In [3]:
# Initialize Spark session
spark = SparkSession.builder.appName("InvertedIndex").getOrCreate()
sc = SparkContext.getOrCreate()

sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0, SparkFiles.getRootDirectory())

from inverted_index_gcp import InvertedIndex

24/03/10 14:33:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Download NLTK stopwords
nltk.download('stopwords')

NUM_BUCKETS = 124

# Set up stopwords
english_stopwords = set(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [5]:
# Constants
PROJECT_NAME = 'ir-project-415515'
BUCKET_NAME = 'irproj_26051997'
stemmer = PorterStemmer()

In [6]:
# Functions
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()


def token2bucket_id(token):
    return int(_hash(token), 16) % NUM_BUCKETS

def tokenize(text):
    return [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]

def word_count(id, tokens, idx):
    token_counts = Counter(tokens)
    result = [(token, (id, count)) for token, count in token_counts.items()]
    idx.num_docs += 1
    return result

def reduce_word_counts(unsorted_pl):
    return sorted(unsorted_pl, key=lambda x: x[0])


def calculate_df(postings):
    return postings.map(lambda x: (x[0], len(x[1]))) # (token, df) - df - in how many documents the term ocurred


def partition_postings_and_write(postings, index):
    map_to_buckets = postings.map(lambda item: (token2bucket_id(item[0]), item)).groupByKey()
    return map_to_buckets.map(lambda x: InvertedIndex.write_a_posting_list(x, index.base_dir, BUCKET_NAME))


def create_anchor_list(page):
    doc_id, anchors = page[0], page[1]     
    return [(doc_id, anchor[1]) for anchor in anchors]


def create_index(doc_pairs, directory, filter_tf=False):
    inverted = InvertedIndex(base_dir=directory)
    
    doc_pairs = doc_pairs.map(lambda pair: (pair[0], tokenize(pair[1])))
    doc_lengths = doc_pairs.mapValues(len).collectAsMap()
    print("Done tokenization and doc lengths calculation")
    
    # Calculate word counts and filter
    word_counts = doc_pairs.flatMap(lambda x: word_count(x[0], x[1], inverted))
    postings = word_counts.groupByKey().mapValues(reduce_word_counts)
    w2df_dict = calculate_df(postings).collectAsMap()
    
    if filter_tf:
        postings = postings.filter(lambda x: len(x[1]) > 50)
        
    print("Done posting lists creation and df creation")

    _ = partition_postings_and_write(postings, inverted).collect()
    
    # Collect all posting lists locations into one super-set
    super_posting_locs = defaultdict(list)
    for blob in client.list_blobs(BUCKET_NAME, prefix=directory):
        if not blob.name.endswith("pickle"):
            continue
        with blob.open("rb") as f:
            posting_locs = pickle.load(f)
            for k, v in posting_locs.items():
                super_posting_locs[k].extend(v)
    
    print("Done creating a posting locs list")

    
    # Create and configure InvertedIndex instance
    inverted.posting_locs = super_posting_locs
    print("Saved posting locs")

    inverted.df.update(w2df_dict)
    print("Updated df")

    inverted.term_total.update(postings.flatMapValues(lambda x: x).map(lambda x: (x[0], x[1][1])).reduceByKey(add).collectAsMap())
    print("Updated tf")
    
    inverted.doc_lengths.update(doc_lengths)
    print("Updated doc_lengths")

    inverted.avg_doc_length = np.mean(np.array(list(doc_lengths.values())))
    print("Calculated average doc length")
    
    return inverted

In [7]:
# Main function
# Set up Google Cloud Storage client
client = storage.Client()

# Get list of blobs in bucket
full_path = f"gs://{BUCKET_NAME}/"

blobs = [b for b in client.list_blobs(BUCKET_NAME, prefix='wiki_files/') if b.name not in ['wiki_files/graphframes.sh', 'wiki_files/']]
paths = [full_path + b.name for b in blobs]

# Read parquet files
parquetFile = spark.read.parquet(*paths)

doc_anchor_pairs = parquetFile.select("id", "anchor_text").rdd
doc_anchor_pairs = doc_anchor_pairs.flatMap(create_anchor_list).groupByKey().mapValues(list).map(lambda x: (x[0], " ".join(x[1])))
inverted_anchor = create_index(doc_anchor_pairs, f'indices/anchor_index/postings_anchor_gcp/', True)
print('Created anchor index')

24/03/10 14:41:43 WARN YarnAllocator: Container from a bad node: container_1709977868329_0043_01_000006 on host: cluster-84ed-w-1.c.ir-project-415515.internal. Exit status: 143. Diagnostics: [2024-03-10 14:41:42.577]Container killed on request. Exit code is 143
[2024-03-10 14:41:42.577]Container exited with a non-zero exit code 143. 
[2024-03-10 14:41:42.578]Killed by external signal
.
24/03/10 14:41:43 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 5 for reason Container from a bad node: container_1709977868329_0043_01_000006 on host: cluster-84ed-w-1.c.ir-project-415515.internal. Exit status: 143. Diagnostics: [2024-03-10 14:41:42.577]Container killed on request. Exit code is 143
[2024-03-10 14:41:42.577]Container exited with a non-zero exit code 143. 
[2024-03-10 14:41:42.578]Killed by external signal
.
24/03/10 14:41:43 ERROR YarnScheduler: Lost executor 5 on cluster-84ed-w-1.c.ir-project-415515.internal: Container from a bad node: container_1

Done tokenization and doc lengths calculation


24/03/10 14:48:58 WARN YarnAllocator: Container from a bad node: container_1709977868329_0043_01_000002 on host: cluster-84ed-w-1.c.ir-project-415515.internal. Exit status: 143. Diagnostics: [2024-03-10 14:48:57.856]Container killed on request. Exit code is 143
[2024-03-10 14:48:57.857]Container exited with a non-zero exit code 143. 
[2024-03-10 14:48:57.857]Killed by external signal
.
24/03/10 14:48:58 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container from a bad node: container_1709977868329_0043_01_000002 on host: cluster-84ed-w-1.c.ir-project-415515.internal. Exit status: 143. Diagnostics: [2024-03-10 14:48:57.856]Container killed on request. Exit code is 143
[2024-03-10 14:48:57.857]Container exited with a non-zero exit code 143. 
[2024-03-10 14:48:57.857]Killed by external signal
.
24/03/10 14:48:58 ERROR YarnScheduler: Lost executor 2 on cluster-84ed-w-1.c.ir-project-415515.internal: Container from a bad node: container_1

Done posting lists creation and df creation


                                                                                

Done creating a posting locs list
Saved posting locs
Updated df


                                                                                

Updated tf
Updated doc_lengths
Calculated average doc length
Created anchor index


In [9]:
doc_token_counts_anchor = doc_anchor_pairs.map(lambda x: (x[0], len(tokenize(x[1]))))
inverted_anchor.doc_lengths = doc_token_counts_anchor.collectAsMap()
inverted_anchor.num_docs = len(inverted_anchor.doc_lengths.keys())

                                                                                

In [10]:
# Write global stats and upload to Google Storage
inverted_anchor.write_index('.', 'anchor_index')
index_src = "anchor_index.pkl"
index_dst = f'gs://{BUCKET_NAME}/indices/anchor_index/postings_anchor_gcp/{index_src}'
!gsutil cp $index_src $index_dst
print('Anchor index saved to bucket successfully')

Copying file://anchor_index.pkl [Content-Type=application/octet-stream]...
- [1 files][ 82.6 MiB/ 82.6 MiB]                                                
Operation completed over 1 objects/82.6 MiB.                                     
Anchor index saved to bucket successfully


In [None]:
spark.stop()