# Imports & Setup

In [1]:
# if the following command generates an error, you probably didn't enable
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1
# !gcloud dataproc clusters list --region us-south1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-54e1  GCE       4                                             RUNNING  us-central1-a


In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
# PySpark:
import pyspark

# Standard library imports
import hashlib
import itertools
import os
import pickle
import re
import string
from dateutil import parser
import math
from math import log
import sys
from collections import Counter, OrderedDict, defaultdict
from itertools import islice, count, groupby
from operator import itemgetter
from pathlib import Path
from time import time

# Third-party imports
import pandas as pd
from google.cloud import storage
import numpy as np

# NLTK downloads (consider moving these to a setup script or documentation on required corpora)
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.stem.porter import PorterStemmer
from nltk.stem.porter import *
from nltk.corpus import stopwords
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt')
nltk.data.path.append('/root/nltk_data')

# Function definition
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

[nltk_data] Downloading package stopwords to /usr/share/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /usr/share/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar 10 12:08 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [6]:
# Print executor and driver memory settings
print("Executor Memory:", spark.sparkContext.getConf().get("spark.executor.memory"))
print("Driver Memory:", spark.sparkContext.getConf().get("spark.driver.memory"))
print("Executor Memory Overhead:", spark.sparkContext.getConf().get("spark.executor.memoryOverhead"))
print("Driver Memory Overhead:", spark.sparkContext.getConf().get("spark.driver.memoryOverhead"))
print("Driver Max Result Size:", spark.sparkContext.getConf().get("spark.driver.maxResultSize"))

Executor Memory: 2893m
Driver Memory: 2048m
Executor Memory Overhead: None
Driver Memory Overhead: None
Driver Max Result Size: 1024m


In [6]:
spark

In [7]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'shahar_ir_project'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'init_file.sh' and b.name != 'graphframes.sh' and not b.name.startswith("PostingList_") and not b.name.startswith("BigDataFiles") and not b.name.startswith("Index"):
        paths.append(full_path+b.name)

# Building an inverted index (Test, small)

In [8]:
# paths = paths[:5]
parquetFile = spark.read.parquet(*paths)

                                                                                

In [9]:
# Get N - number of documents in indices
N = parquetFile.count()
N

                                                                                

6348910

In [10]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [11]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [12]:
from inverted_index_gcp import InvertedIndex

**Extracting RDDs from the source files**


In [13]:
doc_text_pairs_title = parquetFile.select("title", "id").rdd
doc_text_pairs_body = parquetFile.select("text", "id").rdd

# Create {ID:Title} dictionary using spark for retrieval on query
id_title_rdd = doc_text_pairs_title

                                                                                

**Parsing Functions**

In [14]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = [
    'category', 'references', 'also', 'links', 'extenal', 'see',"links",
                    "may", "first","history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became", '.', ',', '?', '!', ':', ';', '/', '\\', '-', '"', "'", "(", ")",
    "[", "]", "{", "}", "|", "*", "+", "@", "^", "&", "%", "#", "''",'``','...', '',' ', None
]
corpus_stopwords = set(corpus_stopwords + list(string.punctuation))

ALL_STOPWORDS = english_stopwords.union(corpus_stopwords)
lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()

def replace_contractions(token):
    """
    Replace common contractions like "n't" with their full words.
    
    Parameters:
        tokens (list of str): List of tokens.
        
    Returns:
        list of str: List of tokens with contractions replaced.
    """
    contractions_mapping = {
        "n't": "not",
        "'nt": "not",
        "'ll": "will",
        "'ve": "have",
        "'re": "are",
        "'d": "would",
        "'m": "am",
        ".": "",
        "'s": "is"
    }
    
    for key in contractions_mapping.keys():
        if key in token:
            i = token.index(key)
            if key == ".":
                return [token[:i], token[i+1:]]
            return [token[:i], contractions_mapping[key]]
    return [token]


def ParseDateFromToken(token):
    def is_likely_date(token):
        # Check for delimiters typically found in numeric shaped dates
        if re.search(r'(\d{1,4}[- /.]\d{1,2}[- /.]\d{1,4})', token):
            return True
        return False

    if not is_likely_date(token):
        return None

    try:
        parsed_date = parser.parse(token, ignoretz=True)
        
        # Validate year range to prevent OverflowError
        if parsed_date.year < 1 or parsed_date.year > 9999:
            return None

        # Further validation to ignore times without explicit dates
        if parsed_date.hour != 0 or parsed_date.minute != 0 or parsed_date.second != 0:
            # This means the token was more like a time than a date
            return None

        year = parsed_date.year
        full_month_name = parsed_date.strftime("%B")
        return str(year), full_month_name
    except (ValueError, OverflowError, TypeError):
        return None

def PreProcessText(text):
    '''
    Parse input text. Titles - all lowered, body - keeps capitalization under conditions.
    Out is a final list of lemmatized tokens.
    '''
    in_tokens = word_tokenize(text)
    in_tokens = [token.strip(string.punctuation) for token in in_tokens]

    tok_tmp = []
    for token in in_tokens:
      tmp = replace_contractions(token)
      tok_tmp.extend(tmp)
   
    tokens = [token.lower() for token in tok_tmp if token not in ALL_STOPWORDS]
    #     tok_tmp = []
    #     for token in tokens:
    #         date_data = ParseDateFromToken(token)  # list of 2 or None
    #         if date_data is not None:
    #             tok_tmp.extend(date_data)

    #     tokens.extend(tok_tmp)
    tokens = [lemmatizer.lemmatize(token) for token in tokens]
    return tokens

**Parsing Text**

In [15]:
def ParseDocumentToPosting(text, id):
    ''' Count the frequency of each word in `text` (tf) that is not included in
    `ALL_STOPWORDS` and return entries that will go into our posting lists.
    This is the basis of the posting list - creating a posting list for 1 document with basic TF.
    Build enviroment parameter and term vocabulaty in the process.
    Parameters:
    -----------
    text: str
      Text of one document
    id: int
      Document id
    Returns:
    --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
    '''
    # Preprocess the entire test
    tokens = PreProcessText(text)

    # Count the term frequency of each token
    tf = Counter(tokens)

    # Create the list of tuples (token, (doc_id, tf) - will be replaced down the line
    result = [(token, (id, freq)) for token, freq in tf.items()]
    return result


def ParseDocumentToDocSize(text, id):
    ''' Get the doc size of parsed tokens in the document that is not included in
    `ALL_STOPWORDS` and return entries that will go into our Index.
    Parameters:
    -----------
    text: str
      Text of one document
    id: int
      Document id
    InvertedIndexType: str
      Type of index ['title', 'body']
    Returns:
    --------
    Tuple (for RDD)
      (doc_id: len(tokens))
    '''
    # Preprocess the entire test
    tokens = PreProcessText(text)
    n_terms = len(tokens)

    # Return DocSize dict {doc_id: len(tokens)}

    return (id, n_terms)

**Reduce Output to a Posting List, Keep Significant Keys**

In [16]:
def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
  return sorted_pl

**Calculate DF (W2DF) Object and Term Index**

In [17]:
from math import log

def TermStats(postings, N):
    ''' Takes a posting list RDD and calculate the df and idf for each token.
    Parameters:
    -----------
      postings: RDD
        An RDD where each element is a (token, posting_list) pair.
      N: int
        Total number of documents in the corpus.
    Returns:
    --------
      Dictionary1
        An Dict where each element is a (token, (unique_index, idf)) pair. index si unique across all corpus vocab
      Dictionary2
        sub Dict of term: DF (df_rdd) for reading from the Index, not needed for methodological retrieval object
    '''
    df_rdd = postings.map(lambda x: (x[0], len(x[1])))
    idf_rdd = df_rdd.map(lambda x: (x[0], log(N / (x[1] + 1), 10)))  # Using log base 10 for IDF calculation

    # Assuming the rest of your original processing here...

    # Convert IDF RDD to a dictionary for easier index assignment
    idf_dict = idf_rdd.collectAsMap()

    # Assign an index to each term based on its position in the dictionary
    term_idf_mapping = {term: (index, idf) for index, (term, idf) in enumerate(idf_dict.items())}

    return term_idf_mapping, df_rdd.collectAsMap()

**Page Rank**

In [18]:
def generate_graph(pages):
    ''' Compute the directed graph generated by wiki links.
    Parameters:
    -----------
    pages: RDD
        An RDD where each row consists of one Wikipedia article with 'id' and
        'anchor_text'.
    Returns:
    --------
    edges: RDD
        An RDD where each row represents an edge in the directed graph created by
        the Wikipedia links. The first entry should be the source page id and the
        second entry is the destination page id. No duplicates should be present.
    vertices: RDD
        An RDD where each row represents a vertex (node) in the directed graph
        created by the Wikipedia links. No duplicates should be present.
    '''
    # Generate edges RDD using map and reduceByKey
    edges = pages.flatMap(lambda page: [(page.id, anchor.id) for anchor in page.anchor_text])
    edges = edges.map(lambda edge: (edge, None)).reduceByKey(lambda x, _: x).keys()

    # Generate vertices RDD using flatMap and distinct
    vertices = edges.flatMap(lambda edge: [(edge[0],), (edge[1],)]).distinct()

    return edges, vertices

**Create Additional Info Component to Indices**

In [19]:
def AdditionalInfoDict(doc_text_pairs_title, doc_sizes):
    '''
    Creates a dictionary with document ID as keys and tuples of (title, doc_size) as values.

    Parameters:
    -----------
    doc_text_pairs_title: RDD
        An RDD of tuples (title, doc_id).
    doc_sizes: RDD
        An RDD of tuples (doc_id, doc_size).

    Returns:
    --------
    dict
        A dictionary where each key is a doc_id and each value is a tuple (title, doc_size).
    '''

    # Convert doc_text_pairs_title to have doc_id as key
    doc_id_with_title = doc_text_pairs_title.map(lambda x: (x[1], x[0]))  # (doc_id, title)

    # Join doc_id_with_title RDD with doc_sizes RDD based on doc_id
    joined_rdd = doc_id_with_title.join(doc_sizes)  # ((doc_id, (title, doc_size)))

    # Transform the joined RDD into the desired dictionary format
    result_dict = joined_rdd.map(lambda x: (x[0], (x[1][0], x[1][1]))).collectAsMap()

    return result_dict

**Create Doc TF-IDF Sparse Vector**

In [20]:
def CalculateDocTFIDF_Vectors(postings_rdd, term_idf_mapping, doc_additional_info):
    '''
    This function is desined to create a document sparse vector, filled with TF-IDF values, 
    used in order to calculate CosineSimilarity. In reality, this method was used for the title index only.
    '''
    # Broadcast the term_idf_mapping and doc_additional_info to all workers
    term_idf_mapping_bc = sc.broadcast(term_idf_mapping)
    doc_additional_info_bc = sc.broadcast(doc_additional_info)

    # Calculate TF-IDF scores, including normalization by document size
    # Attempting repartition to allow fixed size partitions to be processed
    tfidf_normalized_rdd = postings_rdd.repartition(1000).flatMap(lambda x: [
        ((doc_id, term_idf_mapping_bc.value[x[0]][0]),  # Use term to get (index, idf) and select index
         (tf * term_idf_mapping_bc.value[x[0]][1]) / doc_additional_info_bc.value[doc_id][1])  # Multiply TF by IDF and divide by doc_size
        for doc_id, tf in x[1]  # x[1] is a list of tuples (doc_id, not_norm_tf)
        if x[0] in term_idf_mapping_bc.value  # Check if the term is in the term_idf_mapping
    ]).reduceByKey(lambda x, y: x + y)

    # Convert the RDD to a format of (doc_id, {term_index: normalized_tfidf_score, ...})
    doc_normalized_tfidf_vectors_rdd = tfidf_normalized_rdd.map(lambda x: (x[0][0], (x[0][1], x[1])))\
                                                            .groupByKey()\
                                                            .mapValues(lambda x: dict(x))  # Convert Iterable to Dict for each doc_id

    return doc_normalized_tfidf_vectors_rdd.collectAsMap()

**Write Indices to Bucket**

In [24]:
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def partition_postings_and_write(postings, BASE_DIR, bucket_name = bucket_name):
    ''' A function that partitions the posting lists into buckets, writes out
    all posting lists in a bucket to disk, and returns the posting locations for
    each bucket. Partitioning should be done through the use of `token2bucket`
    above. Writing to disk should use the function  `write_a_posting_list`, a
    static method implemented in inverted_index_colab.py under the InvertedIndex
    class.
    Parameters:
    -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
    Returns:
    --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
    '''
    # YOUR CODE HERE
    # Step 1: Map postings to buckets
    postings = postings.map(lambda x: (token2bucket_id(x[0]), x))

    # Step 2: Group postings by bucket
    postings_grouped = postings.groupByKey().mapValues(list)

    # Step 3: Write postings for each bucket to disk and collect location info
    def write_bucket_postings(bucket_postings):
      bucket_id, postings_list = bucket_postings
      # Prepare the argument for write_a_posting_list
      # This single argument is a tuple containing the bucket ID and the list of postings
      b_w_pl = (bucket_id, postings_list)
      # Call write_a_posting_list with the correctly formatted argument
      posting_locs = InvertedIndex.write_a_posting_list(b_w_pl, BASE_DIR, bucket_name)
      return posting_locs

    # Step 4: Apply write_bucket_postings to each group of postings
    posting_locations_rdd = postings_grouped.map(write_bucket_postings)

    return posting_locations_rdd

**Build PageViews (Built once, used externally)**

In [25]:
# Using user page views (as opposed to spiders and automated traffic) for the
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path)
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB)
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly
# total number of page views (5). Then, remove lines with article id or page
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})
    
# pv_rdd = sc.parallelize(wid2pv.items())
# write out the counter as binary file (pickle it)
with open(pv_clean, 'wb') as f:
  pickle.dump(wid2pv, f)

**Build PageRank (Built once, used externally)**

In [31]:
# Calculate PageRank
pages_links = parquetFile.select("id", "anchor_text").rdd
# construct the graph 
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.rdd.map(lambda x: (x.id, x.pagerank))

pr_collected = pr.collect()
with open("PageRank_dict.pkl", "wb") as f:
    pickle.dump(pr_collected, f)

**Build Index**

In [29]:
# Import pickle files to construct the index part-by-part

# This part allows to take premade pkl files and load them to the work env in order to lighten
# the calculation and build the index


# pickle_file_path = '/home/dataproc/TermData.pkl'

# # Load the pickle file
# with open(pickle_file_path, 'rb') as pickle_file:
#     TermData = pickle.load(pickle_file)
    

# pickle_file_path = '/home/dataproc/W2DF.pkl'

# # Load the pickle file
# with open(pickle_file_path, 'rb') as pickle_file:
#     w2df = pickle.load(pickle_file)
    

# pickle_file_path = '/home/dataproc/Additional_Info.pkl'

# # Load the pickle file
# with open(pickle_file_path, 'rb') as pickle_file:
#     additional_info_dict = pickle.load(pickle_file)

# AVG_doc_size = 2.686965321606386 # Title
# AVG_doc_size = 365.655046299286  # Body

# N = 6348910


# pickle_file_path = '/home/dataproc/Doc_Vectors_Title.pkl'

# # Load the pickle file
# with open(pickle_file_path, 'rb') as pickle_file:
#     Doc_Vectors = pickle.load(pickle_file)


# pickle_file_path = '/home/dataproc/Posting_Locs_Body.pkl'

# # Load the pickle file
# with open(pickle_file_path, 'rb') as pickle_file:
#     super_posting_locs = pickle.load(pickle_file)

In [30]:
INDEX_NAME_T = 'Index_Title_Final_Corrected'
BASE_DIR_T = 'Index_Title_Final_Corrected'

# time the index creation time
t_start = time()
print('Creating attributes')
print('Creating posting')
# Preprocess, sort and filter doc_text_pairs for all indices and create the final posting list
word_counts = doc_text_pairs_title.flatMap(lambda x: ParseDocumentToPosting(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
postings = postings.filter(lambda x: len(x[1])>10)

print('Creating docsize')
# build docsize - for index additional info
doc_sizes = doc_text_pairs_title.map(lambda x: ParseDocumentToDocSize(x[0], x[1]))

# Get AVG doc size:
AVG_doc_size = (doc_sizes.map(lambda x: x[1]).reduce(lambda a, b: a + b)) / N
print('Avg doc size = ', AVG_doc_size)

print('Creating term data')
# global statistics per term
TermData ,w2df = TermStats(postings,N)

with open("TermData.pkl", "wb") as f:
    pickle.dump(TermData, f)
with open("W2DF.pkl", "wb") as f:
    pickle.dump(w2df, f)

print('Creating doc data')
# Additional Info (Docs) Object
additional_info_dict = AdditionalInfoDict(doc_text_pairs_title, doc_sizes)

with open("Additional_Info.pkl", "wb") as f:
    pickle.dump(additional_info_dict, f)

print('Creating doc vectors')
# Document TF-IDF vectors
Doc_Vectors = CalculateDocTFIDF_Vectors(postings, TermData, additional_info_dict)
with open("Doc_Vectors_Title.pkl", "wb") as f:
    pickle.dump(Doc_Vectors, f)

# partition posting lists and write out
# Title
print('Writing PostingList to Bucket')

_ = partition_postings_and_write(postings, BASE_DIR_T).collect()

# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=BASE_DIR_T):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

with open("Posting_Locs_Title.pkl", "wb") as f:
    pickle.dump(super_posting_locs, f)

print('Building Index')
inverted = InvertedIndex()

inverted.posting_locs = super_posting_locs
inverted.df = w2df
inverted.term_data = TermData
inverted.additional_info = additional_info_dict
inverted.doc_vectors = Doc_Vectors
inverted.N = N
inverted.AVG_doc_length = AVG_doc_size

print('Writing Index')
# write the global stats out
inverted.write_index('.', INDEX_NAME_T)
# upload to gs
index_src = f"{INDEX_NAME_T}.pkl"

index_dst = f'gs://{bucket_name}/{BASE_DIR_T}/{index_src}'
!gsutil cp $index_src $index_dst
print('Index creation Time (Title): ',(time() - t_start)/60, ' Minutes')

Creating attributes
Creating posting
Creating docsize


                                                                                

Avg doc size =  2.686965321606386
Writing PostingList to Bucket


                                                                                

In [34]:
INDEX_NAME_B = 'Index_Body_Final_Corrected'
BASE_DIR_B = 'Index_Body_Final_Corrected'

# time the index creation time
t_start = time()
print('Creating attributes')
print('Creating posting')
# Preprocess, sort and filter doc_text_pairs for all indices and create the final posting list
word_counts = doc_text_pairs_body.flatMap(lambda x: ParseDocumentToPosting(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
postings = postings.filter(lambda x: len(x[1])>50)

print('Creating docsize')
# build docsize - for index additional info
doc_sizes = doc_text_pairs_body.map(lambda x: ParseDocumentToDocSize(x[0], x[1]))

# Get AVG doc size:
AVG_doc_size = (doc_sizes.map(lambda x: x[1]).reduce(lambda a, b: a + b)) / N
print(AVG_doc_size)

# print('Time so far: ', (time() - t_start)/60, ' minutes')
print('Creating term data')
# global statistics per index
TermData ,w2df = TermStats(postings,N)

with open("TermData.pkl", "wb") as f:
    pickle.dump(TermData, f)
with open("W2DF.pkl", "wb") as f:
    pickle.dump(w2df, f)

# print('Time so far: ', (time() - t_start)/60, ' minutes')
print('Creating doc data')
# Additional Info (Docs) Object
additional_info_dict = AdditionalInfoDict(doc_text_pairs_title, doc_sizes)  # Will hold title, not body

with open("Additional_Info.pkl", "wb") as f:
    pickle.dump(additional_info_dict, f)

partition posting lists and write out
# Body
print('Time so far: ', (time() - t_start)/60, ' minutes')
print('Writing PostingList to Bucket')
_ = partition_postings_and_write(postings, BASE_DIR_B).collect()

# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=BASE_DIR_B):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)
    
with open("Posting_Locs_Body.pkl", "wb") as f:
    pickle.dump(super_posting_locs, f)

print('Building Index')
inverted = InvertedIndex()

inverted.posting_locs = super_posting_locs
inverted.df = w2df
inverted.additional_info = additional_info_dict
inverted.term_data = TermData
inverted.N = N
inverted.AVG_doc_length = AVG_doc_size

print('Time so far: ', (time() - t_start)/60, ' minutes')
print('Writing Index')
# write the global stats out
inverted.write_index('.', INDEX_NAME_B)
# upload to gs
index_src = f"{INDEX_NAME_B}.pkl"

index_dst = f'gs://{bucket_name}/{BASE_DIR_B}/{index_src}'
!gsutil cp $index_src $index_dst

print('Index creation Time (Body): ',(time() - t_start)/60, ' Minutes')

Building Index
Time so far:  4.53790028889974e-06  minutes
Writing Index
Copying file://Index_Body_Final_Corrected.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 18.8 MiB/ 18.8 MiB]                                                
Operation completed over 1 objects/18.8 MiB.                                     
Index creation Time (Body):  0.035965132713317874  Minutes
